Skip to content

Commit

Permalink
fix(workflow): failure when re-/executing a subset of workflow file s…
Browse files Browse the repository at this point in the history
…teps (#3263)

* fix(workflow): failure when executing a subset of workflow file steps
* fix(workflow): chaining error when re-executing workflow files
  • Loading branch information
mohammad-alisafaee authored Jan 17, 2023
1 parent d81a487 commit 7d2094e
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 12 deletions.
3 changes: 3 additions & 0 deletions renku/core/workflow/model/workflow_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def to_command_parameter(self, plan_id: str, index: int) -> CommandParameter:
description=self.description,
id=CommandParameter.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand Down Expand Up @@ -312,6 +313,7 @@ def to_command_input(self, plan_id: str, index: int) -> CommandInput:
id=CommandInput.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
mapped_to=MappedIOStream.from_str(self.mapped_to) if self.mapped_to else None,
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand All @@ -336,6 +338,7 @@ def to_command_output(self, plan_id: str, index: int) -> CommandOutput:
id=CommandOutput.generate_id(plan_id=plan_id, name=self.name, postfix=postfix),
mapped_to=MappedIOStream.from_str(self.mapped_to) if self.mapped_to else None,
name=self.name,
name_set_by_user=self.name_set_by_user,
position=self.position,
postfix=postfix,
prefix=self.prefix,
Expand Down
6 changes: 4 additions & 2 deletions renku/core/workflow/workflow_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def filter_steps(workflow: WorkflowFileCompositePlan, steps: List[str]) -> List[
return [s for s in workflow.plans if s.unqualified_name in selected_steps]


def get_all_workflow_file_inputs_and_outputs(workflow_file: WorkflowFile) -> List[str]:
def get_workflow_file_inputs_and_outputs(workflow_file: WorkflowFile, steps: List[str]) -> List[str]:
"""Return a list of all inputs and outputs that must be committed."""
return [io.path for step in workflow_file.steps for io in itertools.chain(step.inputs, step.outputs) if io.persist]
selected_steps = [s for s in workflow_file.steps if s.name in steps] if steps else workflow_file.steps

return [io.path for step in selected_steps for io in itertools.chain(step.inputs, step.outputs) if io.persist]
30 changes: 27 additions & 3 deletions renku/domain_model/workflow/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@ def generate_id(stream_type: str) -> str:
class CommandParameterBase:
"""Represents a parameter for a Plan."""

# NOTE: This attribute is only used by workflow-file machinery to check if plans are the same or not. We need it,
# because names are generated randomly when not set by users which make the comparison return incorrect result.
name_set_by_user: bool = False

def __init__(
self,
*,
default_value: Any,
description: Optional[str],
id: str,
name: Optional[str],
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
derived_from: Optional[str] = None,
Expand All @@ -90,6 +95,7 @@ def __init__(
self.derived_from: Optional[str] = derived_from
# NOTE: ``postfix`` is used only to generate a nicer ``id`` for a parameter. Its value isn't used anywhere else.
self.postfix: Optional[str] = postfix
self.name_set_by_user: bool = name_set_by_user

if name is not None:
self.name: str = name
Expand Down Expand Up @@ -132,10 +138,13 @@ def role(self) -> str:
@staticmethod
def _get_equality_attributes() -> List[str]:
"""Return a list of attributes values that determine if instances are equal."""
return ["name", "description", "default_value", "prefix", "position"]
# NOTE: We treat name differently
return ["description", "default_value", "prefix", "position"]

def is_equal_to(self, other) -> bool:
"""Return if attributes that cause a change in the parameter, are the same."""
if self.name_set_by_user != other.name_set_by_user or (self.name_set_by_user and self.name != other.name):
return False
return all(getattr(self, a) == getattr(other, a) for a in self._get_equality_attributes())

def to_argv(self, quote_string: bool = True) -> List[Any]:
Expand Down Expand Up @@ -193,6 +202,7 @@ def __init__(
description: str = None,
id: str,
name: str = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: str = None,
derived_from: str = None,
Expand All @@ -203,6 +213,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -245,6 +256,7 @@ def __init__(
id: str,
mapped_to: Optional[MappedIOStream] = None,
name: Optional[str] = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
encoding_format: Optional[List[str]] = None,
Expand All @@ -258,6 +270,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -323,6 +336,7 @@ def __init__(
id: str,
mapped_to: Optional[MappedIOStream] = None,
name: Optional[str] = None,
name_set_by_user: bool = False,
position: Optional[int] = None,
prefix: Optional[str] = None,
encoding_format: Optional[List[str]] = None,
Expand All @@ -336,6 +350,7 @@ def __init__(
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
position=position,
prefix=prefix,
derived_from=derived_from,
Expand Down Expand Up @@ -381,7 +396,8 @@ def is_equal_to(self, other) -> bool:
@staticmethod
def _get_equality_attributes() -> List[str]:
"""Return a list of attributes values that determine if instances are equal."""
return CommandParameterBase._get_equality_attributes() + ["encoding_format", "create_folder"]
# NOTE: Don't include ``create_folder`` in comparison since its value is state-dependent
return CommandParameterBase._get_equality_attributes() + ["encoding_format"]

def derive(self, plan_id: str) -> "CommandOutput":
"""Create a new ``CommandOutput`` that is derived from self."""
Expand All @@ -400,10 +416,18 @@ def __init__(
description: Optional[str] = None,
id: str,
name: Optional[str] = None,
name_set_by_user: bool = False,
mapped_parameters: List[CommandParameterBase],
**kwargs,
):
super().__init__(default_value=default_value, description=description, id=id, name=name, **kwargs)
super().__init__(
default_value=default_value,
description=description,
id=id,
name=name,
name_set_by_user=name_set_by_user,
**kwargs,
)

self.mapped_parameters: List[CommandParameterBase] = mapped_parameters

Expand Down
9 changes: 6 additions & 3 deletions renku/ui/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@
from renku.core import errors
from renku.core.plugin.workflow_file_parser import read_workflow_file
from renku.core.util.os import is_subpath
from renku.core.workflow.workflow_file import get_all_workflow_file_inputs_and_outputs
from renku.core.workflow.workflow_file import get_workflow_file_inputs_and_outputs
from renku.domain_model.project_context import project_context
from renku.ui.cli.utils.callback import ClickCallback
from renku.ui.cli.utils.plugins import available_workflow_providers
Expand Down Expand Up @@ -594,6 +594,7 @@ def is_workflow_file() -> bool:
communicator.warn("All flags other than '--file', '--verbose', '--dry-run', and 'no-commit' are ignored")

path = command_line[0]
steps = command_line[1:]
no_commit = no_commit or dry_run

# NOTE: Read the workflow file to get list of generated files that should be committed
Expand All @@ -603,7 +604,9 @@ def is_workflow_file() -> bool:
else:
workflow_file = read_workflow_file(path=path, parser="renku")
commit_only = (
[path] + get_all_workflow_file_inputs_and_outputs(workflow_file) + [str(project_context.metadata_path)]
[path]
+ get_workflow_file_inputs_and_outputs(workflow_file=workflow_file, steps=steps)
+ [str(project_context.metadata_path)]
)

provider = provider or "local"
Expand All @@ -612,7 +615,7 @@ def is_workflow_file() -> bool:
run_workflow_file_command(no_commit=no_commit, commit_only=commit_only)
.with_communicator(communicator)
.build()
.execute(path=path, steps=command_line[1:], dry_run=dry_run, workflow_file=workflow_file, provider=provider)
.execute(path=path, steps=steps, dry_run=dry_run, workflow_file=workflow_file, provider=provider)
)

if dry_run:
Expand Down
46 changes: 42 additions & 4 deletions tests/cli/test_workflow_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ def test_dry_run_workflow_file(runner, workflow_file_project):

def test_run_workflow_file_with_selected_steps(runner, workflow_file_project):
"""Test running a sub-set of steps of a workflow file."""
result = runner.invoke(cli, ["run", "--dry-run", workflow_file_project.workflow_file, "head", "line-count"])
result = runner.invoke(cli, ["run", workflow_file_project.workflow_file, "head", "tail"])
assert 0 == result.exit_code, format_result_exception(result)

assert "Will execute step 'head': head $n $models $colors > $temporary-result" in result.output
assert "Will execute step 'tail': tail $parameters intermediate > results/output.csv" not in result.output
assert "Will execute step 'line-count': wc -l $models-and-colors > $output" in result.output
assert "Executing step 'workflow-file.head':" in result.output
assert "Executing step 'workflow-file.tail':" in result.output
assert "Executing step 'workflow-file.line-count':" not in result.output

# Third step's output isn't created
assert not (workflow_file_project.path / "results" / "output.csv.wc").exists()


def test_run_workflow_file_with_no_commit(runner, workflow_file_project):
Expand Down Expand Up @@ -357,6 +360,41 @@ def test_workflow_file_plan_versioning(runner, workflow_file_project, with_injec
assert line_count_3.derived_from is None


def test_workflow_file_plan_versioning_with_selected_steps(runner, workflow_file_project, with_injection):
"""Test plans are versioned correctly when executing subsets of steps."""
result = runner.invoke(cli, ["run", workflow_file_project.workflow_file, "head", "tail"])
assert 0 == result.exit_code, format_result_exception(result)
time.sleep(1)

with with_injection():
plan_gateway = PlanGateway()
root_plan_1 = plan_gateway.get_by_name("workflow-file")
head_1 = plan_gateway.get_by_name("workflow-file.head")
tail_1 = plan_gateway.get_by_name("workflow-file.tail")
line_count_1 = plan_gateway.get_by_name("workflow-file.line-count")

result = runner.invoke(cli, ["run", workflow_file_project.workflow_file])
assert 0 == result.exit_code, format_result_exception(result)

time.sleep(1)

with with_injection():
plan_gateway = PlanGateway()
root_plan_2 = plan_gateway.get_by_name("workflow-file")
head_2 = plan_gateway.get_by_name("workflow-file.head")
tail_2 = plan_gateway.get_by_name("workflow-file.tail")
line_count_2 = plan_gateway.get_by_name("workflow-file.line-count")

# Plan `line-count` wasn't executed in the first run
assert line_count_1 is None
assert line_count_2 is not None

# Everything else is the same
assert root_plan_2.id == root_plan_1.id
assert head_2.id == head_1.id
assert tail_2.id == tail_1.id


def test_duplicate_workflow_file_plan_name(runner, workflow_file_project):
"""Test workflow file execution fails if a plan with the same name exists."""
workflow_file_project.repository.add(all=True)
Expand Down

0 comments on commit 7d2094e

Please sign in to comment.