diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index be30edd91..09ed404e7 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -709,7 +709,7 @@ def process_embedded_tool( step_name: str, name_prefix: str, context: MutableMapping[str, Any], -): +) -> tuple[cwl_utils.parser.Process, str, MutableMapping[str, Any]]: run_command = cwl_element.run inner_context = dict(context) # If the `run` options contains an inline CWL object @@ -740,7 +740,7 @@ def process_embedded_tool( ) # Otherwise, the `run` options contains an URI else: - # Fetch and translare the target file + # Fetch and translate the target file run_command = cwl_utils.parser.load_document_by_uri( path=cwl_element.loadingOptions.fetcher.urljoin( base_url=cwl_element.loadingOptions.fileuri, url=run_command diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index e871034c2..e00c83da2 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -44,6 +44,20 @@ def _get_directory(path_processor: ModuleType, directory: str | None, target: Ta return directory or path_processor.join(target.workdir, utils.random_name()) +def _get_token_ids(token_list): + return [t.persistent_id for t in (token_list or []) if t.persistent_id] + + +def _group_by_tag( + inputs: MutableMapping[str, Token], + inputs_map: MutableMapping[str, MutableMapping[str, Token]], +) -> None: + for name, token in inputs.items(): + if token.tag not in inputs_map: + inputs_map[token.tag] = {} + inputs_map[token.tag][name] = token + + def _reduce_statuses(statuses: MutableSequence[Status]): num_skipped = 0 for status in statuses: @@ -59,18 +73,9 @@ def _reduce_statuses(statuses: MutableSequence[Status]): return Status.COMPLETED -def _group_by_tag( - inputs: MutableMapping[str, Token], - inputs_map: MutableMapping[str, MutableMapping[str, Token]], -) -> None: - for name, token in inputs.items(): - if token.tag not in inputs_map: - inputs_map[token.tag] = {} - inputs_map[token.tag][name] = token - - -def _get_token_ids(token_list): - return [t.persistent_id for t in (token_list or []) if t.persistent_id] +def _starts_with_tag(tag: str, sub_tag: str) -> bool: + sublist = sub_tag.split(".") + return tag.split(".")[: len(sublist)] == sublist class BaseStep(Step, ABC): @@ -250,9 +255,9 @@ def _add_to_list( for key in list(self._token_values.keys()): if tag == key: continue - elif key.startswith(tag): + elif _starts_with_tag(key, tag): self._add_to_port(token, self._token_values[key], port_name) - elif tag.startswith(key): + elif _starts_with_tag(tag, key): if tag not in self._token_values: self._token_values[tag] = {} for p in self._token_values[key]: diff --git a/tests/test_remotepath.py b/tests/test_remotepath.py index a54a61907..967ee525d 100644 --- a/tests/test_remotepath.py +++ b/tests/test_remotepath.py @@ -155,7 +155,7 @@ async def test_mkdir_failure(context): with pytest.raises(WorkflowExecutionException) as err: await path.mkdir(mode=mode) expected_msg_err = f"1 Command 'mkdir -m {mode:o} {path}' on location {location}: mkdir: can't create directory '{path}': File exists" - assert str(err.value) == expected_msg_err + assert expected_msg_err in str(err.value) @pytest.mark.asyncio