Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix runtime environment (scheduler, validation, etc.) #5570

Merged
merged 7 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
from cylc.flow.parsec.util import replicate
from cylc.flow.pathutil import (
get_workflow_run_dir,
get_workflow_run_scheduler_log_dir,
get_workflow_run_share_dir,
get_workflow_run_work_dir,
get_workflow_name_from_id
get_workflow_name_from_id,
get_cylc_run_dir,
is_relative_to,
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
Expand Down Expand Up @@ -240,20 +238,27 @@ def __init__(
work_dir: Optional[str] = None,
share_dir: Optional[str] = None
) -> None:
"""
Initialize the workflow config object.

Args:
workflow: workflow ID
fpath: workflow config file path
options: CLI options
"""
check_deprecation(Path(fpath))
self.mem_log = mem_log_func
if self.mem_log is None:
self.mem_log = lambda x: None
self.mem_log("config.py:config.py: start init config")
self.workflow = workflow # workflow id
self.workflow = workflow
self.workflow_name = get_workflow_name_from_id(self.workflow)
self.fpath = str(fpath) # workflow definition
self.fdir = os.path.dirname(fpath)
self.run_dir = run_dir or get_workflow_run_dir(self.workflow)
self.log_dir = (log_dir or
get_workflow_run_scheduler_log_dir(self.workflow))
self.share_dir = share_dir or get_workflow_run_share_dir(self.workflow)
self.work_dir = work_dir or get_workflow_run_work_dir(self.workflow)
self.fpath: Path = Path(fpath)
self.fdir = str(self.fpath.parent)
self.run_dir = run_dir
self.log_dir = log_dir
self.share_dir = share_dir
self.work_dir = work_dir
self.options = options
self.implicit_tasks: Set[str] = set()
self.edges: Dict[
Expand Down Expand Up @@ -890,7 +895,7 @@ def _check_implicit_tasks(self) -> None:
)
# Allow implicit tasks in back-compat mode unless rose-suite.conf
# present (to maintain compat with Rose 2019)
elif not Path(self.run_dir, 'rose-suite.conf').is_file():
elif not (self.fpath.parent / "rose-suite.conf").is_file():
LOG.debug(msg)
return

Expand Down Expand Up @@ -1491,19 +1496,41 @@ def print_first_parent_tree(self, pretty=False, titles=False):
print_tree(tree, padding=padding, use_unicode=pretty)

def process_workflow_env(self):
"""Workflow context is exported to the local environment."""
"""Export Workflow context to the local environment.

A source workflow has only a name.
Once installed it also has an ID and a run directory.
And at scheduler start-up it has work, share, and log sub-dirs too.
"""
for key, value in {
**verbosity_to_env(cylc.flow.flags.verbosity),
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_NAME': self.workflow_name,
'CYLC_WORKFLOW_NAME_BASE': str(Path(self.workflow_name).name),
'CYLC_WORKFLOW_RUN_DIR': self.run_dir,
'CYLC_WORKFLOW_LOG_DIR': self.log_dir,
'CYLC_WORKFLOW_WORK_DIR': self.work_dir,
'CYLC_WORKFLOW_SHARE_DIR': self.share_dir,
}.items():
os.environ[key] = value

if is_relative_to(self.fdir, get_cylc_run_dir()):
# This is an installed workflow.
# - self.run_dir is only defined by the scheduler
# - but the run dir exists, created at installation
# - run sub-dirs may exist, if this installation was run already
# but if the scheduler is not running they shouldn't be used.
for key, value in {
'CYLC_WORKFLOW_ID': self.workflow,
'CYLC_WORKFLOW_RUN_DIR': str(self.fdir),
}.items():
os.environ[key] = value

if self.run_dir is not None:
# Run directory is only defined if the scheduler is running; in
# which case the following run sub-directories must exist.
for key, value in {
'CYLC_WORKFLOW_LOG_DIR': str(self.log_dir),
'CYLC_WORKFLOW_WORK_DIR': str(self.work_dir),
'CYLC_WORKFLOW_SHARE_DIR': str(self.share_dir),
}.items():
os.environ[key] = value

def process_config_env(self):
"""Set local config derived environment."""
os.environ['CYLC_UTC'] = str(get_utc_mode())
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
from optparse import Values
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from pathlib import Path
import pytest
import logging
from types import SimpleNamespace
from unittest.mock import Mock
from contextlib import suppress

from cylc.flow import CYLC_LOG
from cylc.flow.config import WorkflowConfig
Expand Down Expand Up @@ -1565,3 +1567,94 @@ def test__warn_if_queues_have_implicit_tasks(caplog):
assert "'baz'" not in result
assert f"showing first {max_warning_lines}" in result


@pytest.mark.parametrize(
'installed, run_dir, cylc_vars',
[
pytest.param(
False, # not installed (parsing a source dir)
None, # no run directory passed to config object by scheduler
{
'CYLC_WORKFLOW_NAME': True, # expected environment variables
'CYLC_WORKFLOW_ID': False,
'CYLC_WORKFLOW_RUN_DIR': False,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="source-dir"
),
pytest.param(
True,
None,
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': False,
'CYLC_WORKFLOW_SHARE_DIR': False,
'CYLC_WORKFLOW_LOG_DIR': False,
},
id="run-dir"
),
pytest.param(
True,
"/some/path",
{
'CYLC_WORKFLOW_NAME': True,
'CYLC_WORKFLOW_ID': True,
'CYLC_WORKFLOW_RUN_DIR': True,
'CYLC_WORKFLOW_WORK_DIR': True,
'CYLC_WORKFLOW_SHARE_DIR': True,
'CYLC_WORKFLOW_LOG_DIR': True,
},
id="run-dir-from-scheduler"
),
]
)
def test_cylc_env_at_parsing(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
installed,
run_dir,
cylc_vars
):
"""Check that CYLC_ environment vars exported during config file parsing
are appropriate to the workflow context (source, installed, or running).
"""

# Purge environment from previous tests.
for key in cylc_vars.keys():
with suppress(KeyError):
del os.environ[key]

flow_file = tmp_path / WorkflowFiles.FLOW_FILE
flow_config = """
[scheduler]
allow implicit tasks = True
[scheduling]
[[graph]]
R1 = 'foo'
"""

flow_file.write_text(flow_config)

# Make it look as if path is relative to cylc-run (i.e. installed).
monkeypatch.setattr(
'cylc.flow.config.is_relative_to',
lambda _a, _b: installed
)

# Parse the workflow config then check the environment.
WorkflowConfig(
workflow="name", fpath=flow_file, options=Mock(spec=[]),
run_dir=run_dir
)

cylc_env = [k for k in os.environ.keys() if k.startswith('CYLC_')]

for var, expected in cylc_vars.items():
if expected:
assert var in cylc_env
else:
assert var not in cylc_env