From 77a23ab1ef642b8b70fd3001eb8faf06c897c449 Mon Sep 17 00:00:00 2001 From: Sebastian Scherer Date: Mon, 2 Dec 2024 16:21:46 +0000 Subject: [PATCH] cleaned up unit and k8s tests for core component and pipeline test cases --- .../k8s/pipelines/pipeline/test_pipeline.py | 8 + .../pipelines/component/test_component.py | 223 ++++++++++++------ .../component/test_torch_ddp_component.py | 130 +--------- .../unit/pipelines/pipeline/test_pipeline.py | 21 +- 4 files changed, 176 insertions(+), 206 deletions(-) diff --git a/sdk/test/k8s/pipelines/pipeline/test_pipeline.py b/sdk/test/k8s/pipelines/pipeline/test_pipeline.py index d2571f0..04ecc49 100644 --- a/sdk/test/k8s/pipelines/pipeline/test_pipeline.py +++ b/sdk/test/k8s/pipelines/pipeline/test_pipeline.py @@ -28,6 +28,10 @@ def test_artifact_pipeline_decorator_and_register_and_run( """Registers and runs an example Pipeline passing artifacts across components.""" + # export pipeline to allow for manual checks + parameter_to_artifact_pipeline.export(test_output_dir) + + # register workflow template with argo server on k8s cluster parameter_to_artifact_pipeline.register() assert parameter_to_artifact_pipeline.registered @@ -57,6 +61,10 @@ def test_parameter_pipeline_decorator_and_register_and_run( """Registers and runs an example Pipeline passing parameters across components.""" + # export pipeline to allow for manual checks + adding_parameters_pipeline.export(test_output_dir) + + # register workflow template with argo server on k8s cluster adding_parameters_pipeline.register() assert adding_parameters_pipeline.registered diff --git a/sdk/test/unit/pipelines/component/test_component.py b/sdk/test/unit/pipelines/component/test_component.py index dab4e41..49500b8 100644 --- a/sdk/test/unit/pipelines/component/test_component.py +++ b/sdk/test/unit/pipelines/component/test_component.py @@ -11,8 +11,7 @@ OutputParameter, ) from bettmensch_ai.pipelines.pipeline_context import _pipeline_context -from hera.shared.serialization import MISSING -from hera.workflows import DAG, Parameter, WorkflowTemplate +from hera.workflows import DAG, Artifact, Parameter, WorkflowTemplate, models def test_component___init__(test_function_and_task_inputs): @@ -31,10 +30,10 @@ def test_component___init__(test_function_and_task_inputs): .set_gpus(1) ) - # validate addition of component to pipeline context + # --- validate addition of component to pipeline context assert test_component == _pipeline_context.components[0] - # validate component attributes + # --- validate component instance's attributes assert isinstance(test_component, Component) assert test_component.implementation == "standard" assert test_component.base_name == "test-name" @@ -48,41 +47,39 @@ def test_component___init__(test_function_and_task_inputs): assert test_component.custom_resources is None assert test_component.depends == "mock-component-0" - # validate component task_inputs - for task_input_name in ("a", "b", "c", "d"): - assert ( - test_component.task_inputs[task_input_name].name == task_input_name - ) - assert ( - test_component.task_inputs[task_input_name].owner == test_component - ) - assert ( - test_component.task_inputs[task_input_name].source - is test_task_inputs[task_input_name] - ) - - assert test_component.task_inputs["a"].value == test_task_inputs["a"].value - assert test_component.task_inputs["b"].value == test_task_inputs["b"].value - assert test_component.task_inputs["c"].value == MISSING - - # validate component template_inputs - assert list(test_component.template_inputs.keys()) == ["d"] - isinstance(test_component.template_inputs["d"], InputArtifact) - test_component.template_inputs["d"].name = "d" - - # validate component template_outputs - assert test_component.template_outputs["a_out"].owner == test_component - assert isinstance( - test_component.template_outputs["a_out"], OutputParameter - ) # noqa: E501 - assert test_component.template_outputs["b_out"].owner == test_component - assert isinstance(test_component.template_outputs["b_out"], OutputArtifact) + # --- validate component instance's io attributes + # note that InputParameter type argument are automatically injected by + # hera's script's `build_inputs` method, so arent being constructed here + # explicitly + assert test_component.template_inputs == { + "d": InputArtifact(name="d").set_owner(test_component), + } + + assert test_component.template_outputs == { + "a_out": OutputParameter(name="a_out").set_owner(test_component), + "b_out": OutputArtifact(name="b_out").set_owner(test_component), + } + + assert test_component.task_inputs == { + "a": InputParameter(name="a", value=1) + .set_owner(test_component) + .set_source(test_task_inputs["a"]), + "b": InputParameter(name="b", value=1) + .set_owner(test_component) + .set_source(test_task_inputs["b"]), + "c": InputParameter(name="c") + .set_owner(test_component) + .set_source(test_task_inputs["c"]), + "d": InputArtifact(name="d") + .set_owner(test_component) + .set_source(test_task_inputs["d"]), + } assert test_component.task_factory is None def test_component_decorator( - test_function_and_task_inputs, test_mock_pipeline, test_mock_component + test_function_and_task_inputs, ): """Tests of Component constructor.""" @@ -104,7 +101,7 @@ def test_component_decorator( # validate addition of component to pipeline context assert test_component == _pipeline_context.components[0] - # validate component attributes + # --- validate component instance's attributes assert isinstance(test_component, Component) assert test_component.implementation == "standard" assert test_component.base_name == "test-name" @@ -116,36 +113,35 @@ def test_component_decorator( assert test_component.gpus == 1 assert test_component.ephemeral is None assert test_component.custom_resources is None + assert test_component.depends == "mock-component-0" - # validate component task_inputs - for task_input_name in ("a", "b", "c", "d"): - assert ( - test_component.task_inputs[task_input_name].name == task_input_name - ) - assert ( - test_component.task_inputs[task_input_name].owner == test_component - ) - assert ( - test_component.task_inputs[task_input_name].source - is test_task_inputs[task_input_name] - ) - - assert test_component.task_inputs["a"].value == test_task_inputs["a"].value - assert test_component.task_inputs["b"].value == test_task_inputs["b"].value - assert test_component.task_inputs["c"].value == MISSING - - # validate component template_inputs - assert list(test_component.template_inputs.keys()) == ["d"] - isinstance(test_component.template_inputs["d"], InputArtifact) - test_component.template_inputs["d"].name = "d" - - # validate component template_outputs - assert test_component.template_outputs["a_out"].owner == test_component - assert isinstance( - test_component.template_outputs["a_out"], OutputParameter - ) # noqa: E501 - assert test_component.template_outputs["b_out"].owner == test_component - assert isinstance(test_component.template_outputs["b_out"], OutputArtifact) + # --- validate component instance's io attributes + # note that InputParameter type argument are automatically injected by + # hera's script's `build_inputs` method, so arent being constructed here + # explicitly + assert test_component.template_inputs == { + "d": InputArtifact(name="d").set_owner(test_component), + } + + assert test_component.template_outputs == { + "a_out": OutputParameter(name="a_out").set_owner(test_component), + "b_out": OutputArtifact(name="b_out").set_owner(test_component), + } + + assert test_component.task_inputs == { + "a": InputParameter(name="a", value=1) + .set_owner(test_component) + .set_source(test_task_inputs["a"]), + "b": InputParameter(name="b", value=1) + .set_owner(test_component) + .set_source(test_task_inputs["b"]), + "c": InputParameter(name="c") + .set_owner(test_component) + .set_source(test_task_inputs["c"]), + "d": InputArtifact(name="d") + .set_owner(test_component) + .set_source(test_task_inputs["d"]), + } assert test_component.task_factory is None @@ -207,11 +203,52 @@ def test_parameter_component_to_hera(test_output_dir, test_mock_pipeline): wft.to_file(test_output_dir) - task_names = [task.name for task in wft.templates[0].tasks] - assert task_names == ["a-plus-b-0", "a-plus-b-plus-2-0"] - - script_template_names = [template.name for template in wft.templates[1:]] - assert script_template_names == ["a-plus-b", "a-plus-b-plus-2"] + # --- validate `to_hera` outputs via argo.workflows.WorkflowTemplate + # instance + + # validate tasks + first_task = wft.templates[0].tasks[0] + assert first_task.name == "a-plus-b-0" + assert first_task.template.name == "a-plus-b" + assert first_task.arguments == [ + Parameter(name="a", value="{{inputs.parameters.a}}"), + Parameter(name="b", value="{{inputs.parameters.b}}"), + ] + assert first_task.depends == "" + + second_task = wft.templates[0].tasks[1] + assert second_task.name == "a-plus-b-plus-2-0" + assert second_task.template.name == "a-plus-b-plus-2" + assert second_task.arguments == [ + Parameter( + name="a", value="{{tasks.a-plus-b-0.outputs.parameters.sum}}" + ), + Parameter(name="b", value=2), + ] + assert second_task.depends == "a-plus-b-0" + + # validate script templates + first_script_template = wft.templates[1] + first_script_template.name == "a-plus-b" + first_script_template.inputs == [ + Parameter(name="a", value=1), + Parameter(name="b", value=2), + Parameter(name="sum", value=None), + ] + first_script_template.outputs == [ + Parameter(name="sum", value_from=models.ValueFrom(path="sum")), + ] + + second_script_template = wft.templates[2] + second_script_template.name == "a-plus-b-plus-2" + second_script_template.inputs == [ + Parameter(name="a", value=1), + Parameter(name="b", value=2), + Parameter(name="sum", value=None), + ] + second_script_template.outputs == [ + Parameter(name="sum", value_from=models.ValueFrom(path="sum")), + ] def test_artifact_component_to_hera( @@ -268,8 +305,46 @@ def test_artifact_component_to_hera( wft.to_file(test_output_dir) - task_names = [task.name for task in wft.templates[0].tasks] - assert task_names == ["convert-parameters-0", "show-artifacts-0"] - - script_template_names = [template.name for template in wft.templates[1:]] - assert script_template_names == ["convert-parameters", "show-artifacts"] + # --- validate `to_hera` outputs via argo.workflows.WorkflowTemplate + # instance + + # validate tasks + first_task = wft.templates[0].tasks[0] + assert first_task.name == "convert-parameters-0" + assert first_task.template.name == "convert-parameters" + assert first_task.arguments == [ + Parameter(name="a", value="{{inputs.parameters.a}}"), + ] + assert first_task.depends == "" + + second_task = wft.templates[0].tasks[1] + assert second_task.name == "show-artifacts-0" + assert second_task.template.name == "show-artifacts" + assert second_task.arguments == [ + Artifact( + name="a", + from_="{{tasks.convert-parameters-0.outputs.artifacts.a_art}}", + ), + ] + assert second_task.depends == "convert-parameters-0" + + # validate script templates + first_script_template = wft.templates[1] + first_script_template.name == "convert-parameters" + first_script_template.inputs == [ + Parameter(name="a"), + Parameter(name="a_art", value=None), + ] + first_script_template.outputs == [ + Artifact(name="a_art", path="a_art"), + ] + + second_script_template = wft.templates[2] + second_script_template.name == "show-artifacts" + second_script_template.inputs == [ + Artifact(name="a", path="a"), + Parameter(name="b", value=None), + ] + second_script_template.outputs == [ + Artifact(name="b", path="sum"), + ] diff --git a/sdk/test/unit/pipelines/component/test_torch_ddp_component.py b/sdk/test/unit/pipelines/component/test_torch_ddp_component.py index 250daaa..082f1ca 100644 --- a/sdk/test/unit/pipelines/component/test_torch_ddp_component.py +++ b/sdk/test/unit/pipelines/component/test_torch_ddp_component.py @@ -7,44 +7,15 @@ convert_to_artifact_torch_ddp_factory, show_parameter_torch_ddp_factory, ) -from bettmensch_ai.pipelines.io import ( - InputArtifact, - InputParameter, - OutputArtifact, - OutputParameter, -) +from bettmensch_ai.pipelines.io import InputParameter from bettmensch_ai.pipelines.pipeline_context import _pipeline_context -from hera.shared.serialization import MISSING from hera.workflows import DAG, Parameter, WorkflowTemplate -def test_torch_component___init__(test_mock_pipeline, test_mock_component): +def test_torch_component___init__(test_function_and_task_inputs): """Tests of Component constructor.""" - def test_function( - a: InputParameter, - b: InputParameter, - c: InputParameter, - d: InputArtifact, - a_out: OutputParameter, - b_out: OutputArtifact, - ): - pass - - test_input_a = InputParameter("fixed", 1) - test_input_b = InputParameter("mock_pipe_in", 1) - test_input_b.set_owner(test_mock_pipeline) - test_input_c = OutputParameter("mock_comp_out_param") - test_input_c.set_owner(test_mock_component) - test_input_d = OutputArtifact("mock_comp_out_art") - test_input_d.set_owner(test_mock_component) - - task_inputs = { - "a": test_input_a, - "b": test_input_b, - "c": test_input_c, - "d": test_input_d, - } + test_function, test_task_inputs = test_function_and_task_inputs with _pipeline_context: _pipeline_context.clear() @@ -57,7 +28,7 @@ def test_function( n_nodes=2, min_nodes=2, nproc_per_node=2, - **task_inputs + **test_task_inputs ) .set_cpu(0.5) .set_memory("100Mi") @@ -84,69 +55,14 @@ def test_function( assert test_component.ephemeral is None assert test_component.custom_resources is None - # validate component task_inputs - for task_input_name in ("a", "b", "c", "d"): - assert ( - test_component.task_inputs[task_input_name].name == task_input_name - ) - assert ( - test_component.task_inputs[task_input_name].owner == test_component - ) - assert ( - test_component.task_inputs[task_input_name].source - is task_inputs[task_input_name] - ) - - assert test_component.task_inputs["a"].value == task_inputs["a"].value - assert test_component.task_inputs["b"].value == task_inputs["b"].value - assert test_component.task_inputs["c"].value == MISSING - - # validate component template_inputs - assert list(test_component.template_inputs.keys()) == ["d"] - isinstance(test_component.template_inputs["d"], InputArtifact) - test_component.template_inputs["d"].name = "d" - - # validate component template_outputs - assert test_component.template_outputs["a_out"].owner == test_component - assert isinstance( - test_component.template_outputs["a_out"], OutputParameter - ) # noqa: E501 - assert test_component.template_outputs["b_out"].owner == test_component - assert isinstance(test_component.template_outputs["b_out"], OutputArtifact) - - assert test_component.task_factory is None - -def test_torch_component_decorator(test_mock_pipeline, test_mock_component): +def test_torch_component_decorator(test_function_and_task_inputs): """Tests of Component constructor.""" - def test_function( - a: InputParameter, - b: InputParameter, - c: InputParameter, - d: InputArtifact, - a_out: OutputParameter, - b_out: OutputArtifact, - ): - pass + test_function, test_task_inputs = test_function_and_task_inputs test_component_factory = as_torch_ddp_component(test_function) - test_input_a = InputParameter("fixed", 1) - test_input_b = InputParameter("mock_pipe_in", 1) - test_input_b.set_owner(test_mock_pipeline) - test_input_c = OutputParameter("mock_comp_out_param") - test_input_c.set_owner(test_mock_component) - test_input_d = OutputArtifact("mock_comp_out_art") - test_input_d.set_owner(test_mock_component) - - task_inputs = { - "a": test_input_a, - "b": test_input_b, - "c": test_input_c, - "d": test_input_d, - } - with _pipeline_context: _pipeline_context.clear() @@ -157,7 +73,7 @@ def test_function( min_nodes=2, nproc_per_node=2, name="test_name", - **task_inputs + **test_task_inputs ) .set_cpu(0.5) .set_memory("100Mi") @@ -183,38 +99,6 @@ def test_function( assert test_component.ephemeral is None assert test_component.custom_resources is None - # validate component task_inputs - for task_input_name in ("a", "b", "c", "d"): - assert ( - test_component.task_inputs[task_input_name].name == task_input_name - ) - assert ( - test_component.task_inputs[task_input_name].owner == test_component - ) - assert ( - test_component.task_inputs[task_input_name].source - is task_inputs[task_input_name] - ) - - assert test_component.task_inputs["a"].value == task_inputs["a"].value - assert test_component.task_inputs["b"].value == task_inputs["b"].value - assert test_component.task_inputs["c"].value == MISSING - - # validate component template_inputs - assert list(test_component.template_inputs.keys()) == ["d"] - isinstance(test_component.template_inputs["d"], InputArtifact) - test_component.template_inputs["d"].name = "d" - - # validate component template_outputs - assert test_component.template_outputs["a_out"].owner == test_component - assert isinstance( - test_component.template_outputs["a_out"], OutputParameter - ) # noqa: E501 - assert test_component.template_outputs["b_out"].owner == test_component - assert isinstance(test_component.template_outputs["b_out"], OutputArtifact) - - assert test_component.task_factory is None - def test_parameter_torch_component_to_hera( test_output_dir, test_mock_pipeline diff --git a/sdk/test/unit/pipelines/pipeline/test_pipeline.py b/sdk/test/unit/pipelines/pipeline/test_pipeline.py index 020fe8d..eaa54d0 100644 --- a/sdk/test/unit/pipelines/pipeline/test_pipeline.py +++ b/sdk/test/unit/pipelines/pipeline/test_pipeline.py @@ -11,13 +11,17 @@ def test_artifact_pipeline( ): """Declaration of Pipeline using InputArtifact and OutputArtifact""" + # export pipeline to allow for manual checks + parameter_to_artifact_pipeline.export(test_output_dir) + + # --- validate pipeline instance's basic attributes assert parameter_to_artifact_pipeline.built assert not parameter_to_artifact_pipeline.registered assert parameter_to_artifact_pipeline.registered_id is None assert parameter_to_artifact_pipeline.registered_name is None assert parameter_to_artifact_pipeline.registered_namespace is None - # --- validate class' io attributes + # --- validate pipeline instance's io attributes assert parameter_to_artifact_pipeline.inputs == { "a": InputParameter(name="a", value="Param A").set_owner( parameter_to_artifact_pipeline @@ -35,7 +39,7 @@ def test_artifact_pipeline( .set_source(parameter_to_artifact_pipeline.inputs["a"]), } - # --- validate class' `user_built_workflow_template` attribute + # --- validate pipeline instance's `user_built_workflow_template` attribute wft = parameter_to_artifact_pipeline.user_built_workflow_template # validate spec @@ -75,20 +79,21 @@ def test_artifact_pipeline( Parameter(name="a", value="{{workflow.parameters.a}}"), ] - parameter_to_artifact_pipeline.export(test_output_dir) - def test_parameter_pipeline(test_output_dir): """Declaration of Pipeline using InputParameter and OutputParameter""" - # --- validate class' basic attributes + # export pipeline to allow for manual checks + adding_parameters_pipeline.export(test_output_dir) + + # --- validate pipeline instance's basic attributes assert adding_parameters_pipeline.built assert not adding_parameters_pipeline.registered assert adding_parameters_pipeline.registered_id is None assert adding_parameters_pipeline.registered_name is None assert adding_parameters_pipeline.registered_namespace is None - # --- validate class' io attributes + # --- validate pipeline instance's io attributes assert adding_parameters_pipeline.inputs == { "a": InputParameter(name="a", value=1).set_owner( adding_parameters_pipeline @@ -112,7 +117,7 @@ def test_parameter_pipeline(test_output_dir): .set_source(adding_parameters_pipeline.inputs["b"]), } - # --- validate class' `user_built_workflow_template` attribute + # --- validate pipeline instance's `user_built_workflow_template` attribute wft = adding_parameters_pipeline.user_built_workflow_template # validate spec @@ -159,5 +164,3 @@ def test_parameter_pipeline(test_output_dir): Parameter(name="a", value="{{workflow.parameters.a}}"), Parameter(name="b", value="{{workflow.parameters.b}}"), ] - - adding_parameters_pipeline.export(test_output_dir)