Skip to content

Commit

Permalink
Fixed some CWL translator issues (#628)
Browse files Browse the repository at this point in the history
This commit fixes several CWL translator issues:
- Fix the retrieval of the step name from the `cwl-utils` objects. Before
  this commit, the `out` attribute of a `WorkflowStep` object was always
  expected to be a list of strings. Instead, it should be a list of `Any`, following
  the typing hint of the `cwl_utils.parser.WorkflowStep` constructor. In particular,
  it can contain `WorkflowStepOutput` objects.
- Fix the `CommandOutputProcessor` creation when a schema is defined.
  Before this commit, the `CommandInputRecordSchema` was not included in the
  `RecordSchema` case. Similar for `EnumSchema` and `ArraySchema` cases,
  respectively.
- Fix `optional` inputs inside a scatter step
- Fix the `_get_path` method in the CWL translator module. When the `id` field
  is explicitly defined in a CWL file, the `element_id` of the corresponding Python
  object is constructed as `/path/to/file#id`
- Fix the configuration file's path stored in the `DataManager`. Before this commit,
  it was saved the path defined in the StreamFlow file, which could be a relative path
- Fix `optional` inputs of type `list` or `record`. Before this commit, some workflows
  were failing because their input data were erroneously treated as mandatory
  • Loading branch information
LanderOtto authored Jan 19, 2025
1 parent cff8eb2 commit 5cad7bf
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 120 deletions.
2 changes: 1 addition & 1 deletion streamflow/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def add_path(self, path: str):
self.paths.add(path)

def __repr__(self):
return f"Storage(mount_point={self.mount_point}, size={self.size}, paths={self.paths})"
return f"Storage(mount_point={self.mount_point}, size={self.size}, bind={self.bind}, paths={self.paths})"

def __add__(self, other: Any) -> Storage:
if not isinstance(other, Storage):
Expand Down
4 changes: 2 additions & 2 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import MutableMapping, MutableSequence
from enum import Enum
from enum import IntEnum
from typing import TYPE_CHECKING, TypeVar, cast

from streamflow.core import utils
Expand Down Expand Up @@ -205,7 +205,7 @@ async def save(self, context: StreamFlowContext) -> None:
)


class Status(Enum):
class Status(IntEnum):
WAITING = 0
FIREABLE = 1
RUNNING = 2
Expand Down
102 changes: 42 additions & 60 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import urllib.parse
from collections.abc import MutableMapping, MutableSequence
from enum import Enum
from pathlib import PurePosixPath
from pathlib import Path, PurePosixPath
from types import ModuleType
from typing import Any, cast, get_args

Expand Down Expand Up @@ -82,7 +82,12 @@
OnlyNonNullTransformer,
ValueFromTransformer,
)
from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies
from streamflow.cwl.utils import (
LoadListing,
SecondaryFile,
process_embedded_tool,
resolve_dependencies,
)
from streamflow.cwl.workflow import CWLWorkflow
from streamflow.deployment.utils import get_binding_config
from streamflow.log_handler import logger
Expand Down Expand Up @@ -250,10 +255,12 @@ def _create_command_output_processor(
port_target: Target | None,
port_type: (
str
| cwl_utils.parser.InputSchema
| cwl_utils.parser.OutputSchema
| MutableSequence[
str,
cwl_utils.parser.OutputSchema,
cwl_utils.parser.InputSchema,
]
),
cwl_element: (
Expand All @@ -267,7 +274,7 @@ def _create_command_output_processor(
optional: bool = False,
) -> CommandOutputProcessor:
# Array type: -> MapCommandOutputProcessor
if isinstance(port_type, get_args(cwl_utils.parser.OutputArraySchema)):
if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)):
return CWLMapCommandOutputProcessor(
name=port_name,
workflow=workflow,
Expand All @@ -284,7 +291,7 @@ def _create_command_output_processor(
),
)
# Enum type: -> create command output processor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputEnumSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -312,7 +319,7 @@ def _create_command_output_processor(
optional=optional,
)
# Record type: -> ObjectCommandOutputProcessor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputRecordSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -622,6 +629,7 @@ def _create_token_processor(
force_deep_listing=force_deep_listing,
only_propagate_secondary_files=only_propagate_secondary_files,
),
optional=optional,
)
# Enum type: -> create output processor
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
Expand Down Expand Up @@ -679,6 +687,7 @@ def _create_token_processor(
)
for port_type in port_type.fields
},
optional=optional,
)
elif isinstance(port_type, MutableSequence):
optional = "null" in port_type
Expand Down Expand Up @@ -1075,7 +1084,7 @@ def _get_load_listing(
def _get_path(element_id: str) -> str:
path = element_id
if "#" in path:
path = path.split("#")[-1]
path = path.split("#")[0]
if path.startswith("file://"):
path = urllib.parse.unquote(path[7:])
return path
Expand Down Expand Up @@ -1406,6 +1415,9 @@ def __init__(
| cwl_utils.parser.Workflow
) = cwl_definition
self.cwl_inputs: MutableMapping[str, Any] = cwl_inputs

if cwl_inputs_path is not None:
cwl_inputs_path = _get_path(Path(cwl_inputs_path).resolve().as_uri())
self.cwl_inputs_path: str | None = cwl_inputs_path
self.default_map: MutableMapping[str, Any] = {}
self.deployment_map: MutableMapping[str, DeployStep] = {}
Expand Down Expand Up @@ -1504,6 +1516,7 @@ def _handle_optional_input_variables(
cwl_element: cwl_utils.parser.WorkflowStep,
inner_cwl_element: cwl_utils.parser.Process,
cwl_name_prefix: str,
inner_cwl_name_prefix: str,
default_ports: MutableMapping[str, Port],
name_prefix: str,
step_name: str,
Expand All @@ -1512,17 +1525,13 @@ def _handle_optional_input_variables(
inner_input_ports, outer_input_ports = set(), set()
# Get inner CWL object input names
for element_input in inner_cwl_element.inputs:
inner_cwl_name_prefix = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
global_name = utils.get_name(
step_name, inner_cwl_name_prefix, element_input.id
)
port_name = posixpath.relpath(global_name, step_name)
inner_input_ports.add(port_name)
# Get WorkflowStep input names
for element_input in cwl_element.in_:
step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id)
cwl_step_name = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
Expand Down Expand Up @@ -2113,9 +2122,30 @@ def _translate_workflow_step(
utils.get_name(step_name, cwl_step_name, n)
for n in cwl_element.scatter or []
]

# Process inner element
run_command, inner_cwl_name_prefix, inner_context = process_embedded_tool(
cwl_element=cwl_element,
step_name=step_name,
name_prefix=name_prefix,
cwl_name_prefix=cwl_name_prefix,
context=context,
)

# Handle optional input variables
default_ports = {}
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
inner_cwl_name_prefix=inner_cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Process inputs
input_ports = {}
default_ports = {}
value_from_transformers = {}
input_dependencies = {}
for element_input in cwl_element.in_:
Expand Down Expand Up @@ -2558,62 +2588,14 @@ def _translate_workflow_step(
)
# Update output ports with the internal ones
self.output_ports |= internal_output_ports
# Process inner element
run_command = cwl_element.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = utils.get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = utils.get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
context = {**context, **{"version": run_command.cwlVersion}}
self._recursive_translate(
workflow=workflow,
cwl_element=run_command,
context=context
context=inner_context
| {"requirements": {k: v for k, v in requirements.items() if k != "Loop"}},
name_prefix=step_name,
cwl_name_prefix=inner_cwl_name_prefix,
)
# Handle optional input variables
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Update output ports with the external ones
self.output_ports |= external_output_ports

Expand Down
56 changes: 53 additions & 3 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import cwl_utils.expression
import cwl_utils.parser
import cwl_utils.parser.utils
from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT

from streamflow.core.context import StreamFlowContext
Expand Down Expand Up @@ -84,7 +85,8 @@ async def _get_contents(
):
if (cwl_version not in ("v1.0", "v.1.1")) and size > CONTENT_LIMIT:
raise WorkflowExecutionException(
f"Cannot read contents from files larger than {CONTENT_LIMIT / 1024}kB"
f"Cannot read contents from files larger than "
f"{CONTENT_LIMIT / 1024}kB: file {str(path)} is {size / 1024}kB"
)
return await path.read_text(n=CONTENT_LIMIT)

Expand Down Expand Up @@ -606,10 +608,10 @@ async def get_file_token(
def get_name(
name_prefix: str,
cwl_name_prefix: str,
element_id: str,
element_id: Any,
preserve_cwl_prefix: bool = False,
) -> str:
name = element_id.split("#")[-1]
name = (element_id if isinstance(element_id, str) else element_id.id).split("#")[-1]
return (
posixpath.join(posixpath.sep, name)
if preserve_cwl_prefix
Expand Down Expand Up @@ -663,6 +665,54 @@ class LoadListing(Enum):
deep_listing = 2


def process_embedded_tool(
cwl_element: cwl_utils.parser.WorkflowStep,
cwl_name_prefix: str,
step_name: str,
name_prefix: str,
context: MutableMapping[str, Any],
):
run_command = cwl_element.run
inner_context = dict(context)
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
inner_context |= {"version": run_command.cwlVersion}
return run_command, inner_cwl_name_prefix, inner_context


async def process_secondary_files(
context: StreamFlowContext,
cwl_version: str,
Expand Down
10 changes: 8 additions & 2 deletions streamflow/data/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import logging
import os
import posixpath
from collections.abc import MutableMapping, MutableSequence
Expand Down Expand Up @@ -101,7 +102,9 @@ def __init__(self, context: StreamFlowContext):
self.context: StreamFlowContext = context

def __repr__(self):
return self._node_repr(next(iter(self._filesystem.children.values())), 0)
return "\n".join(
self._node_repr(node, 0) for node in self._filesystem.children.values()
)

def _node_repr(self, node: _RemotePathNode, level: int) -> str:
tree = level * "\t" + "|-- " + repr(node) + "\n"
Expand Down Expand Up @@ -353,7 +356,8 @@ async def transfer_data(
src_path, context=self.context, location=src_location
).resolve()
) is None:
logger.info(f"Remote file system: {repr(self.path_mapper)}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Remote file system: {repr(self.path_mapper)}")
raise WorkflowExecutionException(
f"Error retrieving realpath for {src_path} on location {src_location} "
f"while transferring it to {dst_path} on deployment {dst_connector.deployment_name}"
Expand Down Expand Up @@ -420,6 +424,8 @@ async def transfer_data(
self.register_relation(src_data_location, dst_data_location)
# Otherwise, raise an exception
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Remote file system: {repr(self.path_mapper)}")
raise WorkflowExecutionException(
f"No data locations found for path {src_path} "
f"while trying to map {dst_path} on {dst_location}"
Expand Down
Loading

0 comments on commit 5cad7bf

Please sign in to comment.