Skip to content

Commit

Permalink
Pipeline for Importer Components
Browse files Browse the repository at this point in the history
  • Loading branch information
diegolovison committed Aug 14, 2024
1 parent 079ccc8 commit 8771a81
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from kfp import compiler, dsl
from kfp.dsl import Dataset, Input, Output


common_base_image = "registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61"


@dsl.component(
base_image=common_base_image,
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def normalize_dataset(
input_iris_dataset: Input[Dataset],
normalized_iris_dataset: Output[Dataset],
standard_scaler: bool
):
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler

with open(input_iris_dataset.path) as f:
df = pd.read_csv(f)
labels = df.pop("Labels")

scaler = StandardScaler() if standard_scaler else MinMaxScaler()

df = pd.DataFrame(scaler.fit_transform(df))
df["Labels"] = labels
normalized_iris_dataset.metadata["state"] = "Normalized"
with open(normalized_iris_dataset.path, "w") as f:
df.to_csv(f)


@dsl.pipeline(name="my-pipe")
def my_pipeline(artifact_uri: str, standard_scaler: bool = True,):
importer_task = dsl.importer(
artifact_uri=artifact_uri,
artifact_class=dsl.Dataset,
reimport=True)
normalize_dataset(input_iris_dataset=importer_task.output, standard_scaler=standard_scaler)


if __name__ == "__main__":
compiler.Compiler().compile(my_pipeline, package_path=__file__.replace(".py", "_compiled.yaml"))

Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# PIPELINE DEFINITION
# Name: my-pipe
# Inputs:
# artifact_uri: str
# standard_scaler: bool [Default: True]
components:
comp-importer:
executorLabel: exec-importer
inputDefinitions:
parameters:
uri:
parameterType: STRING
outputDefinitions:
artifacts:
artifact:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
comp-normalize-dataset:
executorLabel: exec-normalize-dataset
inputDefinitions:
artifacts:
input_iris_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
parameters:
standard_scaler:
parameterType: BOOLEAN
outputDefinitions:
artifacts:
normalized_iris_dataset:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
deploymentSpec:
executors:
exec-importer:
importer:
artifactUri:
runtimeParameter: uri
reimport: true
typeSchema:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
exec-normalize-dataset:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- normalize_dataset
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\
\ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef normalize_dataset(\n input_iris_dataset: Input[Dataset],\n\
\ normalized_iris_dataset: Output[Dataset],\n standard_scaler: bool\n\
):\n import pandas as pd\n from sklearn.preprocessing import MinMaxScaler,\
\ StandardScaler\n\n with open(input_iris_dataset.path) as f:\n \
\ df = pd.read_csv(f)\n labels = df.pop(\"Labels\")\n\n scaler =\
\ StandardScaler() if standard_scaler else MinMaxScaler()\n\n df = pd.DataFrame(scaler.fit_transform(df))\n\
\ df[\"Labels\"] = labels\n normalized_iris_dataset.metadata[\"state\"\
] = \"Normalized\"\n with open(normalized_iris_dataset.path, \"w\") as\
\ f:\n df.to_csv(f)\n\n"
image: registry.redhat.io/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61
pipelineInfo:
name: my-pipe
root:
dag:
tasks:
importer:
cachingOptions:
enableCache: true
componentRef:
name: comp-importer
inputs:
parameters:
uri:
componentInputParameter: artifact_uri
taskInfo:
name: importer
normalize-dataset:
cachingOptions:
enableCache: true
componentRef:
name: comp-normalize-dataset
dependentTasks:
- importer
inputs:
artifacts:
input_iris_dataset:
taskOutputArtifact:
outputArtifactKey: artifact
producerTask: importer
parameters:
standard_scaler:
componentInputParameter: standard_scaler
taskInfo:
name: normalize-dataset
inputDefinitions:
parameters:
artifact_uri:
parameterType: STRING
standard_scaler:
defaultValue: true
isOptional: true
parameterType: BOOLEAN
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0

0 comments on commit 8771a81

Please sign in to comment.