Skip to content

Commit

Permalink
Re-create data-store n-window on resize
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Aug 3, 2023
1 parent 2bbd583 commit 6a6e7ce
Showing 1 changed file with 77 additions and 80 deletions.
157 changes: 77 additions & 80 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from typing import (
Any,
Optional,
List,
Set,
TYPE_CHECKING,
Tuple,
Expand Down Expand Up @@ -757,6 +758,8 @@ def increment_graph_window(
'done_ids': set(),
'walk_ids': {active_id},
}
if active_id in self.n_window_completed_walks:
self.n_window_completed_walks.remove(active_id)
active_walk = all_walks[active_id]
active_locs = active_walk['locations']
if source_tokens['task'] not in taskdefs:
Expand All @@ -773,84 +776,50 @@ def increment_graph_window(
)

# Pre-populate from previous walks
# parent branches
if p_tag in active_locs:
# If completed walks are available for all parents, parent walk
# locations of the active task can be considered done/complete.
if active_locs[p_tag].difference(self.n_window_completed_walks):
all_done = True
else:
all_done = False
for p_id in [
p_id
for p_id in active_locs[p_tag]
if p_id in all_walks
]:
for done_loc in [
loc
for loc in all_walks[p_id]['done_locs']
if len(loc) < self.n_edge_distance

]:
p_set = all_walks[p_id]['locations'][done_loc].difference(
active_walk['done_ids']
)
if p_set:
# Add 'p' to map parent loc to active loc
active_locs.setdefault(
p_tag + done_loc,
set()
).update(p_set)
active_walk['walk_ids'].update(p_set)
if all_done:
active_walk['done_locs'].add(done_loc)
# Walk loc may be incomplete (i.e. an inactive parent)
active_walk['done_ids'].update(p_set)
# Child branches
# Will check all location permutations.
# There may be short cuts for parent locs, however children will more
# likely be incomplete walks with no 'done_locs' and using parent's
# children will required sifting out cousin branches.
new_locs: List[str]
working_locs: List[str] = []
if c_tag in active_locs:
# Will check all location permutations.
# as unlike the above parents fill, children will more likely be
# incomplete walks with no 'done_locs' and using parent's children
# will required sifting out cousin branches.
working_locs = {'cc', 'cp'}
while working_locs:
for w_loc in working_locs:
loc_done = True
w_set = set()
# Most will be incomplete walks, however, we can check.
# i.e. parents of children may all exist.
if w_loc[:-1] in active_locs:
for b_id in active_locs[w_loc[:-1]]:
if b_id not in all_walks:
loc_done = False
else:
continue
# find child nodes of parent location,
# i.e. 'cpcc' = 'cpc' + 'c'
w_set = set().union(*[
all_walks[b_id]['locations'][w_loc[-1]]
for b_id in active_locs[w_loc[:-1]]
if (
b_id in all_walks
and w_loc[-1] in all_walks[b_id]['locations']
)
])
if w_set:
active_locs[w_loc] = w_set.difference(
active_walk['done_ids']
)
active_walk['walk_ids'].update(w_set)
if loc_done:
active_walk['done_locs'].add(w_loc[:-1])
active_walk['done_ids'].update(active_locs[w_loc[:-1]])
working_locs = set().union(*[
{loc + c_tag, loc + p_tag}
for loc in working_locs
working_locs.extend(('cc', 'cp'))
if p_tag in active_locs:
working_locs.extend(('pp', 'pc'))
while working_locs:
for w_loc in working_locs:
loc_done = True
# Most will be incomplete walks, however, we can check.
# i.e. parents of children may all exist.
if w_loc[:-1] in active_locs:
for b_id in active_locs[w_loc[:-1]]:
if b_id not in all_walks:
loc_done = False
break
else:
continue
# find child nodes of parent location,
# i.e. 'cpcc' = 'cpc' + 'c'
w_set = set().union(*(
all_walks[b_id]['locations'][w_loc[-1]]
for b_id in active_locs[w_loc[:-1]]
if (
loc in active_locs
and len(loc) < self.n_edge_distance
b_id in all_walks
and w_loc[-1] in all_walks[b_id]['locations']
)
])
))
w_set.difference_update(active_walk['walk_ids'])
if w_set:
active_locs[w_loc] = w_set
active_walk['walk_ids'].update(w_set)
if loc_done:
active_walk['done_locs'].add(w_loc[:-1])
active_walk['done_ids'].update(active_locs[w_loc[:-1]])
new_locs = []
for loc in working_locs:
if loc in active_locs and len(loc) < self.n_edge_distance:
new_locs.extend((loc + c_tag, loc + p_tag))
working_locs = new_locs

# Graph walk
node_tokens: Tokens
Expand Down Expand Up @@ -1655,14 +1624,15 @@ def insert_db_job(self, row_idx, row):
def update_data_structure(self, reloaded=False):
"""Workflow batch updates in the data structure."""

# load database history for flagged nodes
self.apply_task_proxy_db_history()

# Avoids changing window edge distance during edge/node creation
if self.next_n_edge_distance is not None:
self.n_edge_distance = self.next_n_edge_distance
self.window_resize_rewalk()
self.next_n_edge_distance = None

# load database history for flagged nodes
self.apply_task_proxy_db_history()

self.prune_data_store()
if self.state_update_families:
self.update_family_proxies()
Expand Down Expand Up @@ -1696,6 +1666,32 @@ def update_data_structure(self, reloaded=False):
self.clear_delta_batch()
self.clear_delta_store()

def window_resize_rewalk(self):
"""Re-create data-store n-window on resize."""
tokens: Tokens
old_in_paths_nodes = set().union(*(
v
for k, v in self.n_window_nodes.items()
if k in self.all_task_pool
))
self.prune_flagged_nodes.clear()
self.n_window_node_walks.clear()
for tp_id in self.all_task_pool:
tokens = Tokens(tp_id)
tp_id, tproxy = self.store_node_fetcher(tokens)
self.increment_graph_window(
tokens,
get_point(tokens['cycle']),
tproxy.flow_nums
)
self.prune_flagged_nodes.update(
old_in_paths_nodes.difference(*(
v
for k, v in self.n_window_nodes.items()
if k in self.all_task_pool
))
)

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down Expand Up @@ -1967,8 +1963,9 @@ def set_graph_window_extent(self, n_edge_distance):
Maximum edge distance from active node.
"""
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True
if n_edge_distance != self.n_edge_distance:
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True

def update_workflow(self):
"""Update workflow element status and state totals."""
Expand Down

0 comments on commit 6a6e7ce

Please sign in to comment.