From 0d7153002d45e98b64a1abbb4f53334710d32f94 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Fri, 19 Jul 2024 17:51:01 -0400 Subject: [PATCH] Abstractions for reasoning about steps outside of converted code. --- gxformat2/converter.py | 109 +++-------------------------- gxformat2/model.py | 135 +++++++++++++++++++++++++++++++++++- tests/test_model_helpers.py | 49 +++++++++++++ 3 files changed, 191 insertions(+), 102 deletions(-) create mode 100644 tests/test_model_helpers.py diff --git a/gxformat2/converter.py b/gxformat2/converter.py index 25f264d..dcaa776 100644 --- a/gxformat2/converter.py +++ b/gxformat2/converter.py @@ -3,7 +3,6 @@ import argparse import copy import json -import logging import os import sys import uuid @@ -11,9 +10,13 @@ from ._labels import Labels from .model import ( + clean_connection, convert_dict_to_id_list_if_needed, ensure_step_position, inputs_as_native_steps, + pop_connect_from_step_dict, + setup_connected_values, + SUPPORT_LEGACY_CONNECTIONS, with_step_ids, ) from .yaml import ordered_load @@ -22,8 +25,6 @@ Convert a Format 2 Galaxy workflow description into a native format. """ -# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output -SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1" STEP_TYPES = [ "subworkflow", "data_input", @@ -82,22 +83,11 @@ }, } -log = logging.getLogger(__name__) - def rename_arg(argument): return argument -def clean_connection(value): - if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS: - # Hope these are just used by Galaxy testing workflows and such, and not in production workflows. - log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future") - value = value.replace("#", "/", 1) - else: - return value - - class ImportOptions: def __init__(self): @@ -381,7 +371,7 @@ def transform_pause(context, step, default_name="Pause for dataset review"): "name": name } - connect = _init_connect_dict(step) + connect = pop_connect_from_step_dict(step) _populate_input_connections(context, step, connect) _populate_tool_state(step, tool_state) @@ -398,7 +388,7 @@ def transform_subworkflow(context, step): tool_state = { } - connect = _init_connect_dict(step) + connect = pop_connect_from_step_dict(step) _populate_input_connections(context, step, connect) _populate_tool_state(step, tool_state) @@ -407,10 +397,6 @@ def _runtime_value(): return {"__class__": "RuntimeValue"} -def _connected_value(): - return {"__class__": "ConnectedValue"} - - def transform_tool(context, step): if "tool_id" not in step: raise Exception("Tool steps must define a tool_id.") @@ -428,48 +414,13 @@ def transform_tool(context, step): "__page__": 0, } - connect = _init_connect_dict(step) - - def append_link(key, value): - if key not in connect: - connect[key] = [] - assert "$link" in value - link_value = value["$link"] - connect[key].append(clean_connection(link_value)) - - def replace_links(value, key=""): - if _is_link(value): - append_link(key, value) - # Filled in by the connection, so to force late - # validation of the field just mark as ConnectedValue, - # which should be further validated by Galaxy - return _connected_value() - if isinstance(value, dict): - new_values = {} - for k, v in value.items(): - new_key = _join_prefix(key, k) - new_values[k] = replace_links(v, new_key) - return new_values - elif isinstance(value, list): - new_values = [] - for i, v in enumerate(value): - # If we are a repeat we need to modify the key - # but not if values are actually $links. - if _is_link(v): - append_link(key, v) - new_values.append(None) - else: - new_key = "%s_%d" % (key, i) - new_values.append(replace_links(v, new_key)) - return new_values - else: - return value + connect = pop_connect_from_step_dict(step) # TODO: handle runtime inputs and state together. runtime_inputs = step.get("runtime_inputs", []) if "state" in step or runtime_inputs: step_state = step.pop("state", {}) - step_state = replace_links(step_state) + step_state = setup_connected_values(step_state, append_to=connect) for key, value in step_state.items(): tool_state[key] = json.dumps(value) @@ -629,50 +580,6 @@ def _action(type, name, arguments): } -def _is_link(value): - return isinstance(value, dict) and "$link" in value - - -def _join_prefix(prefix, key): - if prefix: - new_key = f"{prefix}|{key}" - else: - new_key = key - return new_key - - -def _init_connect_dict(step): - if "connect" not in step: - step["connect"] = {} - - connect = step["connect"] - del step["connect"] - - # handle CWL-style in dict connections. - if "in" in step: - step_in = step["in"] - assert isinstance(step_in, dict) - connection_keys = set() - for key, value in step_in.items(): - # TODO: this can be a list right? - if isinstance(value, dict) and 'source' in value: - value = value["source"] - elif isinstance(value, dict) and 'default' in value: - continue - elif isinstance(value, dict): - raise KeyError(f'step input must define either source or default {value}') - connect[key] = [value] - connection_keys.add(key) - - for key in connection_keys: - del step_in[key] - - if len(step_in) == 0: - del step['in'] - - return connect - - def _populate_input_connections(context, step, connect): _ensure_inputs_connections(step) input_connections = step["input_connections"] diff --git a/gxformat2/model.py b/gxformat2/model.py index b3d3ef5..de1d39d 100644 --- a/gxformat2/model.py +++ b/gxformat2/model.py @@ -1,7 +1,140 @@ """Abstractions for dealing with Format2 data.""" -from typing import cast, Dict, List, Union +import logging +import os +from typing import ( + Any, + Callable, + cast, + Dict, + List, + Optional, + Union, +) + +from typing_extensions import TypedDict + +log = logging.getLogger(__name__) DictOrList = Union[Dict, List] +ConnectDict = dict + + +EmbeddedLink = TypedDict("EmbeddedLink", {"$link": str}) + +# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output +SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1" + + +def pop_connect_from_step_dict(step: dict) -> ConnectDict: + """Merge 'in' and 'connect' keys into a unified connection dict separated from state. + + Meant to be used an initial processing step in reasoning about connections defined by the + format2 step description. + """ + if "connect" not in step: + step["connect"] = {} + + connect = step["connect"] + del step["connect"] + + # handle CWL-style in dict connections. + if "in" in step: + step_in = step["in"] + assert isinstance(step_in, dict) + connection_keys = set() + for key, value in step_in.items(): + # TODO: this can be a list right? + if isinstance(value, dict) and 'source' in value: + value = value["source"] + elif isinstance(value, dict) and 'default' in value: + continue + elif isinstance(value, dict): + raise KeyError(f'step input must define either source or default {value}') + connect[key] = [value] + connection_keys.add(key) + + for key in connection_keys: + del step_in[key] + + if len(step_in) == 0: + del step['in'] + + return connect + + +AppendLinkCallable = Callable[[str, EmbeddedLink], None] + + +def setup_connected_values(value, key: str = "", append_to: Optional[Dict[str, list]] = None) -> Any: + """Replace links with connected value.""" + + def append_link(key: str, value: dict): + if append_to is None: + return + + if key not in append_to: + append_to[key] = [] + + assert "$link" in value + link_value = value["$link"] + append_to[key].append(clean_connection(link_value)) + + def recurse(sub_value, sub_key) -> Any: + return setup_connected_values(sub_value, sub_key, append_to=append_to) + + if _is_link(value): + append_link(key, value) + # Filled in by the connection, so to force late + # validation of the field just mark as ConnectedValue, + # which should be further validated by Galaxy + return _connected_value() + if isinstance(value, dict): + new_dict_values: Dict[str, Any] = {} + for dict_k, dict_v in value.items(): + new_key = _join_prefix(key, dict_k) + new_dict_values[dict_k] = recurse(dict_v, new_key) + return new_dict_values + elif isinstance(value, list): + new_list_values: List[Any] = [] + for i, list_v in enumerate(value): + # If we are a repeat we need to modify the key + # but not if values are actually $links. + if _is_link(list_v): + assert isinstance(list_v, dict) + append_link(key, list_v) + new_list_values.append(None) + else: + new_key = "%s_%d" % (key, i) + new_list_values.append(recurse(list_v, new_key)) + return new_list_values + else: + return value + + +def clean_connection(value: str) -> str: + """Convert legacy style connection targets with modern CWL-style ones.""" + if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS: + # Hope these are just used by Galaxy testing workflows and such, and not in production workflows. + log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future") + value = value.replace("#", "/", 1) + + return value + + +def _connected_value(): + return {"__class__": "ConnectedValue"} + + +def _is_link(value: Any) -> bool: + return isinstance(value, dict) and "$link" in value + + +def _join_prefix(prefix: Optional[str], key: str): + if prefix: + new_key = f"{prefix}|{key}" + else: + new_key = key + return new_key def convert_dict_to_id_list_if_needed( diff --git a/tests/test_model_helpers.py b/tests/test_model_helpers.py new file mode 100644 index 0000000..d538e2f --- /dev/null +++ b/tests/test_model_helpers.py @@ -0,0 +1,49 @@ +from gxformat2.model import ( + pop_connect_from_step_dict, + setup_connected_values, +) + + +def test_pop_connect(): + raw_step = { + "in": { + "bar": { + "source": "foo/moo", + }, + }, + } + connect = pop_connect_from_step_dict(raw_step) + assert connect["bar"] == ["foo/moo"] + assert "in" not in raw_step + + +def test_pop_connect_preserves_defaults(): + raw_step = { + "in": { + "bar": { + "default": 7, + }, + }, + } + connect = pop_connect_from_step_dict(raw_step) + assert "bar" not in connect + assert "in" in raw_step + + +def test_setup_connected_values(): + raw_state = { + "input": {"$link": "moo/cow"}, + } + connect = {} + setup_connected_values(raw_state, append_to=connect) + assert connect["input"][0] == "moo/cow" + + +def test_setup_connected_values_in_array(): + raw_state = { + "input": [{"$link": "moo/cow"}, {"$link": "moo/cow2"}], + } + connect = {} + setup_connected_values(raw_state, append_to=connect) + assert connect["input"][0] == "moo/cow" + assert connect["input"][1] == "moo/cow2"