Skip to content

Commit

Permalink
Merge pull request #2584 from TomekTrzeciak/ordered_queue
Browse files Browse the repository at this point in the history
Use OrderedDict for queues to make them FIFO
  • Loading branch information
matthewrmshin authored Mar 16, 2018
2 parents 98381de + fbf680f commit 94693de
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 16 deletions.
5 changes: 4 additions & 1 deletion doc/src/cylc-user-guide/cug.tex
Original file line number Diff line number Diff line change
Expand Up @@ -6857,7 +6857,10 @@ \subsection{Limiting Activity With Internal Queues}
Large suites can potentially overwhelm task hosts by submitting too many
tasks at once. You can prevent this with {\em internal queues}, which
limit the number of tasks that can be active (submitted or running)
at the some time.
at the same time.
Internal queues behave in the first-in-first-out (FIFO) manner, i.e.\ tasks are
released from a queue in the same order that they were queued.
A queue is defined by a {\em name}; a {\em limit}, which is the maximum
number of active tasks allowed for the queue; and a list of {\em members},
Expand Down
37 changes: 22 additions & 15 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
TASK_STATUS_RETRYING)
from cylc.wallclock import (
get_current_time_string, get_time_string_from_unix_time)
from parsec.OrderedDict import OrderedDict


class TaskPool(object):
Expand Down Expand Up @@ -208,7 +209,7 @@ def add_to_runahead_pool(self, itask, is_restart=False):
itask.state.set_held()

# add to the runahead pool
self.runahead_pool.setdefault(itask.point, {})
self.runahead_pool.setdefault(itask.point, OrderedDict())
self.runahead_pool[itask.point][itask.identity] = itask
self.rhpool_changed = True

Expand Down Expand Up @@ -460,7 +461,7 @@ def release_runahead_task(self, itask):
queue = self.myq[itask.tdef.name]
except KeyError:
queue = self.config.Q_DEFAULT
self.queues.setdefault(queue, {})
self.queues.setdefault(queue, OrderedDict())
self.queues[queue][itask.identity] = itask
self.pool.setdefault(itask.point, {})
self.pool[itask.point][itask.identity] = itask
Expand Down Expand Up @@ -568,25 +569,32 @@ def get_ready_tasks(self):
Return the tasks that are dequeued.
"""

# 1) queue unqueued tasks that are ready to run or manually forced
now = time()
for itask in self.get_tasks():
if itask.state.status != TASK_STATUS_QUEUED:
# only need to check that unqueued tasks are ready
if itask.manual_trigger or itask.ready_to_run(now):
# queue the task
itask.state.reset_state(TASK_STATUS_QUEUED)
itask.reset_manual_trigger()

# 2) submit queued tasks if manually forced or not queue-limited
ready_tasks = []
qconfig = self.config.cfg['scheduling']['queues']

for queue in self.queues:
# 2.1) count active tasks and compare to queue limit
tasks = self.queues[queue].values()

# 1) queue unqueued tasks that are ready to run or manually forced
for itask in tasks:
if itask.state.status != TASK_STATUS_QUEUED:
# only need to check that unqueued tasks are ready
if itask.manual_trigger or itask.ready_to_run(now):
# queue the task
itask.state.reset_state(TASK_STATUS_QUEUED)
itask.reset_manual_trigger()
# move the task to the back of the queue
self.queues[queue][itask.identity] = \
self.queues[queue].pop(itask.identity)

# 2) submit queued tasks if manually forced or not queue-limited
n_active = 0
n_release = 0
n_limit = qconfig[queue]['limit']
tasks = self.queues[queue].values()

# 2.1) count active tasks and compare to queue limit
if n_limit:
for itask in tasks:
if itask.state.status in [TASK_STATUS_READY,
Expand Down Expand Up @@ -674,8 +682,7 @@ def set_do_reload(self, config, stop_point):
if itask.tdef.name not in self.myq:
continue
key = self.myq[itask.tdef.name]
if key not in new_queues:
new_queues[key] = {}
new_queues.setdefault(key, OrderedDict())
new_queues[key][id_] = itask
self.queues = new_queues

Expand Down
32 changes: 32 additions & 0 deletions tests/queues/02-queueorder.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test job script sets dependencies in evironment.
. "$(dirname "${0}")/test_header"
set_test_number 3

install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
run_ok "${TEST_NAME_BASE}-run" \
cylc run "${SUITE_NAME}" --reference-test --debug --no-detach
run_ok "${TEST_NAME_BASE}-test" bash -o pipefail -c "
cylc cat-log '${SUITE_NAME}' |
grep 'proc_n.*submitted at' |
sort --key=4,4 --check"

purge_suite "${SUITE_NAME}"
exit
17 changes: 17 additions & 0 deletions tests/queues/02-queueorder/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
2018-03-13T14:22:38Z INFO - Initial point: 1
2018-03-13T14:22:38Z INFO - Final point: 1
2018-03-13T14:22:38Z INFO - [delay_n1.1] -triggered off []
2018-03-13T14:22:38Z INFO - [hold.1] -triggered off []
2018-03-13T14:22:40Z INFO - [delay_n2.1] -triggered off ['delay_n1.1']
2018-03-13T14:22:42Z INFO - [delay_n3.1] -triggered off ['delay_n2.1']
2018-03-13T14:22:44Z INFO - [delay_n4.1] -triggered off ['delay_n3.1']
2018-03-13T14:22:47Z INFO - [delay_n5.1] -triggered off ['delay_n4.1']
2018-03-13T14:22:48Z INFO - [proc_n1.1] -triggered off ['delay_n1.1']
2018-03-13T14:22:49Z INFO - [delay_n6.1] -triggered off ['delay_n5.1']
2018-03-13T14:22:50Z INFO - [proc_n2.1] -triggered off ['delay_n2.1']
2018-03-13T14:22:52Z INFO - [delay_n7.1] -triggered off ['delay_n6.1']
2018-03-13T14:22:52Z INFO - [proc_n3.1] -triggered off ['delay_n3.1']
2018-03-13T14:22:54Z INFO - [proc_n4.1] -triggered off ['delay_n4.1']
2018-03-13T14:22:56Z INFO - [proc_n5.1] -triggered off ['delay_n5.1']
2018-03-13T14:22:58Z INFO - [proc_n6.1] -triggered off ['delay_n6.1']
2018-03-13T14:23:00Z INFO - [proc_n7.1] -triggered off ['delay_n7.1']
19 changes: 19 additions & 0 deletions tests/queues/02-queueorder/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[cylc]
[[parameters]]
n = 1..7
[scheduling]
[[queues]]
[[[q1]]]
limit = 1
members = proc<n>, hold
[[dependencies]]
graph = """
delay<n-1> => delay<n>
delay<n> => proc<n>
hold
"""
[runtime]
[[delay<n>]]
[[proc<n>]]
[[hold]]
script = sleep 7

0 comments on commit 94693de

Please sign in to comment.