diff --git a/CHANGES.md b/CHANGES.md index 482508745f3..4d3e9c148b6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -36,6 +36,9 @@ Maintenance release. ### Fixes +[#5192](https://github.com/cylc/cylc-flow/pull/5192) - Recompute runahead limit +after use of `cylc remove`. + [#5125](https://github.com/cylc/cylc-flow/pull/5125) - Allow rose-suite.conf changes to be considered by ``cylc reinstall``. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index c9d7ca483ea..d0dcb18fd9e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -105,8 +105,8 @@ def __init__( self.main_pool: Pool = {} self.hidden_pool: Pool = {} - self.main_pool_list: List[TaskProxy] = [] - self.hidden_pool_list: List[TaskProxy] = [] + self._main_pool_list: List[TaskProxy] = [] + self._hidden_pool_list: List[TaskProxy] = [] self.main_pool_changed = False self.hidden_pool_changed = False @@ -703,22 +703,24 @@ def get_all_tasks(self) -> List[TaskProxy]: def get_tasks(self) -> List[TaskProxy]: """Return a list of task proxies in the main pool.""" + # Cached list only for use internally in this method. if self.main_pool_changed: self.main_pool_changed = False - self.main_pool_list = [] + self._main_pool_list = [] for _, itask_id_map in self.main_pool.items(): for __, itask in itask_id_map.items(): - self.main_pool_list.append(itask) - return self.main_pool_list + self._main_pool_list.append(itask) + return self._main_pool_list def get_hidden_tasks(self) -> List[TaskProxy]: """Return a list of task proxies in the hidden pool.""" + # Cached list only for use internally in this method. if self.hidden_pool_changed: self.hidden_pool_changed = False - self.hidden_pool_list = [] + self._hidden_pool_list = [] for itask_id_maps in self.hidden_pool.values(): - self.hidden_pool_list.extend(list(itask_id_maps.values())) - return self.hidden_pool_list + self._hidden_pool_list.extend(list(itask_id_maps.values())) + return self._hidden_pool_list def get_tasks_by_point(self): """Return a map of task proxies by cycle point.""" @@ -1530,6 +1532,8 @@ def remove_tasks(self, items): itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: self.remove(itask, 'request') + if self.compute_runahead(): + self.release_runahead_tasks() return len(bad_items) def force_trigger_tasks( @@ -1718,8 +1722,8 @@ def stop_flow(self, flow_num): def log_task_pool(self, log_lvl=logging.DEBUG): """Log content of task and prerequisite pools in debug mode.""" for pool, name in [ - (self.main_pool_list, "Main"), - (self.hidden_pool_list, "Hidden") + (self.get_tasks(), "Main"), + (self.get_hidden_tasks(), "Hidden") ]: if pool: LOG.log( diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 86adf380681..a81a504d126 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -558,3 +558,21 @@ async def test_reload_stopcp( assert str(schd.pool.stop_point) == '2020' schd.command_reload_workflow() assert str(schd.pool.stop_point) == '2020' + + +async def test_runahead_after_remove( + example_flow: Scheduler +) -> None: + """The runahead limit should be recomputed after tasks are removed. + + """ + task_pool = example_flow.pool + assert int(task_pool.runahead_limit_point) == 4 + + # No change after removing an intermediate cycle. + task_pool.remove_tasks(['3/*']) + assert int(task_pool.runahead_limit_point) == 4 + + # Should update after removing the first point. + task_pool.remove_tasks(['1/*']) + assert int(task_pool.runahead_limit_point) == 5