Skip to content

Commit

Permalink
Fixed tag hierarchy check (#670)
Browse files Browse the repository at this point in the history
This commit introduces a proper way to check if a `Token` derives from another
`Token` based on their tags. Before this commit, tags were compared using the
`startswith` method of the `str` class. However, this could lead to incorrect results
because tags are represented by numbers. Now, a dedicated `_is_parent_tag`
method has been introduced.

Co-authored-by: GlassOfWhiskey <[email protected]>
  • Loading branch information
LanderOtto and GlassOfWhiskey authored Feb 22, 2025
1 parent cd22647 commit 79c8509
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
4 changes: 2 additions & 2 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def process_embedded_tool(
step_name: str,
name_prefix: str,
context: MutableMapping[str, Any],
):
) -> tuple[cwl_utils.parser.Process, str, MutableMapping[str, Any]]:
run_command = cwl_element.run
inner_context = dict(context)
# If the `run` options contains an inline CWL object
Expand Down Expand Up @@ -740,7 +740,7 @@ def process_embedded_tool(
)
# Otherwise, the `run` options contains an URI
else:
# Fetch and translare the target file
# Fetch and translate the target file
run_command = cwl_utils.parser.load_document_by_uri(
path=cwl_element.loadingOptions.fetcher.urljoin(
base_url=cwl_element.loadingOptions.fileuri, url=run_command
Expand Down
37 changes: 21 additions & 16 deletions streamflow/workflow/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ def _get_directory(path_processor: ModuleType, directory: str | None, target: Ta
return directory or path_processor.join(target.workdir, utils.random_name())


def _get_token_ids(token_list):
return [t.persistent_id for t in (token_list or []) if t.persistent_id]


def _group_by_tag(
inputs: MutableMapping[str, Token],
inputs_map: MutableMapping[str, MutableMapping[str, Token]],
) -> None:
for name, token in inputs.items():
if token.tag not in inputs_map:
inputs_map[token.tag] = {}
inputs_map[token.tag][name] = token


def _is_parent_tag(tag: str, parent: str) -> bool:
parent_idx = parent.split(".")
return tag.split(".")[: len(parent_idx)] == parent_idx


def _reduce_statuses(statuses: MutableSequence[Status]):
num_skipped = 0
for status in statuses:
Expand All @@ -59,20 +78,6 @@ def _reduce_statuses(statuses: MutableSequence[Status]):
return Status.COMPLETED


def _group_by_tag(
inputs: MutableMapping[str, Token],
inputs_map: MutableMapping[str, MutableMapping[str, Token]],
) -> None:
for name, token in inputs.items():
if token.tag not in inputs_map:
inputs_map[token.tag] = {}
inputs_map[token.tag][name] = token


def _get_token_ids(token_list):
return [t.persistent_id for t in (token_list or []) if t.persistent_id]


class BaseStep(Step, ABC):
def __init__(self, name: str, workflow: Workflow):
super().__init__(name, workflow)
Expand Down Expand Up @@ -250,9 +255,9 @@ def _add_to_list(
for key in list(self._token_values.keys()):
if tag == key:
continue
elif key.startswith(tag):
elif _is_parent_tag(key, tag):
self._add_to_port(token, self._token_values[key], port_name)
elif tag.startswith(key):
elif _is_parent_tag(tag, key):
if tag not in self._token_values:
self._token_values[tag] = {}
for p in self._token_values[key]:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def test_mkdir_failure(context):
with pytest.raises(WorkflowExecutionException) as err:
await path.mkdir(mode=mode)
expected_msg_err = f"1 Command 'mkdir -m {mode:o} {path}' on location {location}: mkdir: can't create directory '{path}': File exists"
assert str(err.value) == expected_msg_err
assert expected_msg_err in str(err.value)


@pytest.mark.asyncio
Expand Down

0 comments on commit 79c8509

Please sign in to comment.