Skip to content

Commit

Permalink
Incremental data-store generation
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Jun 29, 2019
1 parent a48169e commit 2dbda8c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ def remove_job(self, job_d):
def remove_task_jobs(self, task_id):
"""removed all jobs associated with a task from the pool."""
name, point_string = TaskID.split(task_id)
t_id = f"/{point_string}/{name}/"
for job_d in self.pool.keys():
t_id = f'{point_string}/{name}/'
for job_d in list(self.pool.keys()):
if t_id in job_d:
del self.pool[job_d]

Expand Down
10 changes: 6 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ def command_reload_suite(self):
if self.options.genref or self.options.reftest:
self.configure_reftest(recon=True)
self.suite_db_mgr.put_suite_params(self)
# Re-initiate data model
self.ws_data_mgr.initiate_data_model()
self.is_updated = True

def set_suite_timer(self):
Expand Down Expand Up @@ -1516,6 +1518,7 @@ def update_profiler_logs(self, tinit):
def run(self):
"""Main loop."""
self.initialise_scheduler()
self.ws_data_mgr.initiate_data_model()
while True: # MAIN LOOP
tinit = time()

Expand Down Expand Up @@ -1586,10 +1589,9 @@ def update_data_structure(self):
t for t in self.pool.get_all_tasks() if t.state.is_updated]
has_updated = self.is_updated or updated_tasks
if has_updated:
# UI Server data update
# TODO: process the entire pool once with self.is_updated
# and update deltas here to be published
self.ws_data_mgr.initiate_data_model()
# WServer incemental data store update
self.ws_data_mgr.increment_graph_elements()
self.ws_data_mgr.update_dynamic_elements(updated_tasks)
# TODO: deprecate state summary manager just use protobuf
self.state_summary_mgr.update(self)
# Database update
Expand Down
128 changes: 118 additions & 10 deletions cylc/flow/ws_data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from copy import copy, deepcopy
import ast

from cylc.flow.cycling.loader import get_point
from cylc.flow.task_id import TaskID
from cylc.flow.suite_status import (
SUITE_STATUS_HELD, SUITE_STATUS_STOPPING,
Expand All @@ -46,8 +47,11 @@ def __init__(self, schd):
self.ancestors = {}
self.descendants = {}
self.parents = {}
self.cycle_states = {}
self.min_point = None
self.max_point = None
self.all_points = set([])
self.all_states = []
self.cycle_states = {}
self.state_count_cycles = {}
# managed data
self.tasks = {}
Expand All @@ -59,10 +63,6 @@ def __init__(self, schd):
self.workflow_id = f"{self.schd.owner}/{self.schd.suite}"
self.workflow = PbWorkflow()

# The following method is inefficiently run on any change
# TODO: Add more update methods:
# - incremental graph generation and pruning (using cycle points)
# - incremental state updates using itask.state.is_updated
def initiate_data_model(self):
"""Initiate or Update data model on start/reload."""
self.generate_definition_elements()
Expand All @@ -71,6 +71,54 @@ def initiate_data_model(self):
self.update_family_proxies()
self.update_workflow()

def increment_graph_elements(self):
"""Update graph elements if needed."""
elements_updated = False
start_point = None
stop_point = None
edges = {}
tproxies = {}
fproxies = {}
new_min_point = self.schd.pool.get_min_point()
new_max_point = self.schd.pool.get_max_point()
# start edge generation begining if not already done.
if not self.min_point and new_min_point:
start_point = str(new_min_point)
if not self.max_point or new_max_point > self.max_point:
stop_point = str(new_max_point)
if start_point is None:
start_point = str(self.max_point or '')
edges = self.edges
tproxies = self.task_proxies
fproxies = self.family_proxies
# Generate edges for new cycles
if start_point and stop_point:
self.generate_graph_elements(
edges, tproxies, fproxies, start_point, stop_point)
elements_updated = True
# Prune data store of cycle points not in pool
if new_min_point > self.min_point:
prune_points = set([])
point_strings = []
for p in self.all_points:
if p < new_min_point:
prune_points.add(p)
point_strings.append(str(p))
self._prune_points(point_strings)
self.all_points.difference_update(prune_points)
elements_updated = True
if elements_updated:
self.min_point = new_min_point
self.max_point = new_max_point
self.update_workflow()

def update_dynamic_elements(self, itasks=None):
"""Update/populate proxy task and family dynamic fields."""
if itasks:
self.update_task_proxies([t.identity for t in itasks])
self.update_family_proxies([str(t.point) for t in itasks])
self.update_workflow()

def generate_definition_elements(self):
"""Generate static definition data elements"""
config = self.schd.config
Expand Down Expand Up @@ -257,23 +305,24 @@ def _generate_ghost_families(self, family_proxies=None, cycle_points=None):
for fam, ids in fam_proxy_ids.items():
self.families[fam].proxies[:] = ids

def generate_graph_elements(self, edges=None, graph=None,
def generate_graph_elements(self, edges=None,
task_proxies=None, family_proxies=None,
start_point=None, stop_point=None):
"""Generate edges and ghost nodes (proxy elements)."""
config = self.schd.config
graph = PbEdges()
if edges is None:
edges = {}
if graph is None:
graph = PbEdges()
if task_proxies is None:
task_proxies = {}
if family_proxies is None:
family_proxies = {}
if start_point is None:
start_point = str(self.schd.pool.get_min_point() or '')
self.min_point = self.schd.pool.get_min_point()
start_point = str(self.min_point or '')
if stop_point is None:
stop_point = str(self.schd.pool.get_max_point() or '')
self.max_point = self.schd.pool.get_max_point()
stop_point = str(self.max_point or '')

cycle_points = set([])

Expand Down Expand Up @@ -327,6 +376,9 @@ def generate_graph_elements(self, edges=None, graph=None,
graph_string=info[3],
)

new_points = set([get_point(p) for p in cycle_points])
self.all_points = self.all_points.union(new_points)

self._generate_ghost_families(family_proxies, cycle_points)
self.workflow.edges.CopyFrom(graph)
# Replace the originals (atomic update, for access from other threads).
Expand Down Expand Up @@ -479,6 +531,62 @@ def update_workflow(self):
# Replace the originals (atomic update, for access from other threads).
self.workflow = workflow

def _prune_points(self, point_strings):
"""Remove old proxies and edges by cycle point."""
if not point_strings:
return
node_ids = []
tasks_proxies = {}
for t_id, tproxy in list(self.task_proxies.items()):
if tproxy.cycle_point in point_strings:
node_ids.append(tproxy.id)
name, _ = TaskID.split(t_id)
tasks_proxies.setdefault(name, []).append(tproxy.id)
del self.task_proxies[t_id]
for name, id_list in tasks_proxies.items():
self.tasks[name].proxies[:] = [
tp for tp in self.tasks[name].proxies
if tp not in id_list]

families_proxies = {}
for f_id, fproxy in list(self.family_proxies.items()):
if fproxy.cycle_point in point_strings:
name, _ = TaskID.split(f_id)
families_proxies.setdefault(name, []).append(fproxy.id)
self.family_proxies[f_id]
for name, id_list in families_proxies.items():
self.families[name].proxies[:] = [
fp for fp in self.families[name].proxies
if fp not in id_list]

g_eids = []
for e_id, edge in list(self.edges.items()):
if edge.source in node_ids:
del self.edges[e_id]
continue
g_eids.append(edge.id)
self.graph.edges[:] = g_eids

for point_string in point_strings:
self.cycle_states.pop(point_string, None)
self.state_count_cycles.pop(point_string, None)
# Re-Compute state_counts (total, and per cycle).
all_states = []
state_count_cycles = {}
for point_string, c_task_states in self.cycle_states.items():
count = {}
for state in c_task_states.values():
if state is None:
continue
try:
count[state] += 1
except KeyError:
count[state] = 1
all_states.append(state)
state_count_cycles[point_string] = count
self.all_states = all_states
self.state_count_cycles = state_count_cycles

def get_entire_workflow(self):
workflow_msg = PbEntireWorkflow()
workflow_msg.workflow.CopyFrom(self.workflow)
Expand Down

0 comments on commit 2dbda8c

Please sign in to comment.