Skip to content

Commit

Permalink
Using a unified %task magic to handle task related actions #155 #154
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Dec 6, 2018
1 parent 119f73b commit 8a003d4
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 49 deletions.
62 changes: 36 additions & 26 deletions src/sos_notebook/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -621,27 +621,32 @@ define([
}

let action_func = {
'pending': kill_task,
'submitted': kill_task,
'running': kill_task,
'completed': resume_task,
'failed': resume_task,
'aborted': resume_task,
'missing': function(){},
'pending': 'kill',
'submitted': 'kill',
'running': 'kill',
'completed': 'resume',
'failed': 'resume',
'aborted': 'resume',
'missing': '',
}

// look for status etc and update them.
let onmouseover = `onmouseover="this.classList='fa fa-2x fa-fw ${action_class[info.status]}'"`;
let onmouseleave = `onmouseleave="this.classList='fa fa-2x fa-fw ${status_class[info.status]}'"`;
let onclick = `onclick="${action_func[info.status]}('${info.task_id}', '${info.queue}');"`;
let onclick = `onclick="task_action({action: '${action_func[info.status]}', task: '${info.task_id}', queue:'${info.queue}'});"`;
let tags = info.tags.split(/\s+/g);
let tags_elems = ''
for (let ti=0; ti < tags.length; ++ti) {
let tag = tags[ti];
if (!tag) {
continue;
}
tags_elems += `<pre class="task_tag_${tag}">${tag}</pre>`;
tags_elems += `<pre class="task_tags task_tag_${tag}">${tag}` +
`<div class="task_tag_actions">` +
`<i class="fa fa-fw fa-refresh" onclick="task_action({action:'status', tag:'${tag}', queue: '${info.queue}'})"></i>` +
`<i class="fa fa-fw fa-stop"" onclick="task_action({action:'kill', tag:'${tag}', queue: '${info.queue}'})"></i>` +
`<i class="fa fa-fw fa-trash"" onclick="task_action({action:'purge', tag:'${tag}', queue: '${info.queue}'})"></i>` +
`</div></pre>`;
}

let data = {
Expand All @@ -657,7 +662,7 @@ define([
${onmouseover} ${onmouseleave} ${onclick}></i>
</td>
<td class="task_id">
<div onclick="task_info('${info.task_id}', '${info.queue}')">
<div onclick="task_action({action:'status', task:'${info.task_id}', queue:'${info.queue}'})">
<pre><i class="fa fa-fw fa-sitemap"></i>${info.task_id}</pre>
</div>
</td>
Expand Down Expand Up @@ -989,22 +994,14 @@ define([
});
};

window.kill_task = function(task_id, task_queue) {
console.log("Kill " + task_id);
send_kernel_msg({
"kill-task": [task_id, task_queue],
});
};

window.resume_task = function(task_id, task_queue) {
console.log("Resume " + task_id);
send_kernel_msg({
"resume-task": [task_id, task_queue],
});
};

window.task_info = function(task_id, task_queue) {
create_panel_cell(`%taskinfo ${task_id} -q ${task_queue}`).execute();
window.task_action = function(param) {
console.log(param)
if (!param.action) {
return;
}
create_panel_cell(`%task ${param.action} ${param.task}` +
(param.tag ? ` -t ${param.tag}` : '') +
(param.queue ? ` -q ${param.queue}` : '')).execute();
scrollPanel();
};

Expand Down Expand Up @@ -1953,6 +1950,19 @@ table.task_table {
border: 0px;
}
.task_tag_actions {
display: none;
}
.task_tag_actions .fa:hover {
color: blue;
}
.task_tags:hover .task_tag_actions {
display: flex;
flex-direction: row;
}
table.workflow_table i,
table.task_table i {
margin-right: 5px;
Expand Down
159 changes: 136 additions & 23 deletions src/sos_notebook/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,38 +1883,126 @@ def apply(self, code, silent, store_history, user_expressions, allow_stdin):
}


class TaskInfo_Magic(SoS_Magic):
name = 'taskinfo'
class Task_Magic(SoS_Magic):
name = 'task'

def __init__(self, kernel):
super(TaskInfo_Magic, self).__init__(kernel)
super(Task_Magic, self).__init__(kernel)

def get_parser(self):
parser = argparse.ArgumentParser(prog='%taskinfo',
parser = argparse.ArgumentParser(prog='%task',
description='''Get information on specified task. By default
sos would query against all running task queues but it would
start a task queue and query status if option -q is specified.
''')
parser.add_argument('task', help='ID of task')
parser.add_argument('-q', '--queue',
help='''Task queue on which the task is executed.''')
parser.add_argument('-c', '--config', help='''A configuration file with host
definitions, in case the definitions are not defined in global or local
sos config.yml files.''')
subparsers = parser.add_subparsers(help='actions')
status = subparsers.add_parser('status', help='task status')
status.add_argument('tasks', nargs='*', help='''ID of the task. All tasks
that are releted to the workflow executed under the current directory
will be checked if unspecified. There is no need to specify compelete
task IDs because SoS will match specified name with tasks starting with
these names.''')
status.add_argument('-q', '--queue',
help='''Check the status of job on specified tasks queue or remote host
if the tasks . The queue can be defined in global or local sos
configuration file, or a file specified by option --config. A host is
assumed to be a remote machine with process type if no configuration
is found.''')
status.add_argument('-c', '--config', help='''A configuration file with host
definitions, in case the definitions are not defined in global sos config.yml files.''')
status.add_argument('-a', '--all', action='store_true',
help='''Check the status of all tasks on local or specified remote task queue,
including tasks created by workflows executed from other directories.''')
status.add_argument('-v', dest='verbosity', type=int, choices=range(5), default=2,
help='''Output error (0), warning (1), info (2), debug (3) and trace (4)
information to standard output (default to 2).''')
status.add_argument('-t', '--tags', nargs='*', help='''Only list tasks with
one of the specified tags.''')
status.add_argument('-s', '--status', nargs='*', help='''Display tasks with
one of the specified status.''')
status.add_argument('--age', help='''Limit to tasks that are created more than
(default) or within specified age. Value of this parameter can be in units
s (second), m (minute), h (hour), or d (day, default), or in the foramt of
HH:MM:SS, with optional prefix + for older (default) and - for newer than
specified age.''')
status.add_argument('--html', action='store_true',
help='''Output results in HTML format. This option will override option
verbosity and output detailed status information in HTML tables and
figures.''')
status.add_argument('--numeric-times', action='store_true',
help=argparse.SUPPRESS)
status.set_defaults(func=self.status)

resume = subparsers.add_parser('resume', help='task status')
resume.add_argument('tasks', nargs='*', help='''ID of the tasks to be removed.
There is no need to specify compelete task IDs because SoS will match specified
name with tasks starting with these names. If no task ID is specified,
all tasks related to specified workflows (option -w) will be removed.''')
resume.add_argument('-q', '--queue',
help='''Remove tasks on specified tasks queue or remote host
if the tasks . The queue can be defined in global or local sos
configuration file, or a file specified by option --config. A host is
assumed to be a remote machine with process type if no configuration
is found. ''')
resume.set_defaults(func=self.resume)

kill = subparsers.add_parser('kill', help='task status')
kill.add_argument('tasks', nargs='*', help='''IDs of the tasks
that will be killed. There is no need to specify compelete task IDs because
SoS will match specified name with tasks starting with these names.''')
kill.add_argument('-a', '--all', action='store_true',
help='''Kill all tasks in local or specified remote task queue''')
kill.add_argument('-q', '--queue',
help='''Kill jobs on specified tasks queue or remote host
if the tasks . The queue can be defined in global or local sos
configuration file, or a file specified by option --config. A host is
assumed to be a remote machine with process type if no configuration
is found.''')
kill.add_argument('-t', '--tags', nargs='*', help='''Only kill tasks with
one of the specified tags.''')
kill.add_argument('-c', '--config', help='''A configuration file with host
definitions, in case the definitions are not defined in global sos config.yml files.''')
kill.add_argument('-v', '--verbosity', type=int, choices=range(5), default=2,
help='''Output error (0), warning (1), info (2), debug (3) and trace (4)
information to standard output (default to 2).''')
kill.set_defaults(func=self.kill)

purge = subparsers.add_parser('purge', help='task status')
purge.add_argument('tasks', nargs='*', help='''ID of the tasks to be removed.
There is no need to specify compelete task IDs because SoS will match specified
name with tasks starting with these names. If no task ID is specified,
all tasks related to specified workflows (option -w) will be removed.''')
purge.add_argument('-a', '--all', action='store_true',
help='''Clear all task information on local or specified remote task queue,
including tasks created by other workflows.''')
purge.add_argument('--age', help='''Limit to tasks that are created more than
(default) or within specified age. Value of this parameter can be in units
s (second), m (minute), h (hour), or d (day, default), or in the foramt of
HH:MM:SS, with optional prefix + for older (default) and - for newer than
specified age.''')
purge.add_argument('-s', '--status', nargs='+', help='''Only remove tasks with
specified status, which can be pending, submitted, running, completed, failed,
and aborted. One of more status can be specified.''')
purge.add_argument('-t', '--tags', nargs='*', help='''Only remove tasks with
one of the specified tags.''')
purge.add_argument('-q', '--queue',
help='''Remove tasks on specified tasks queue or remote host
if the tasks . The queue can be defined in global or local sos
configuration file, or a file specified by option --config. A host is
assumed to be a remote machine with process type if no configuration
is found. ''')
purge.add_argument('-c', '--config', help='''A configuration file with host
definitions, in case the definitions are not defined in global sos config.yml files.''')
purge.add_argument('-v', dest='verbosity', type=int, choices=range(5), default=2,
help='''Output error (0), warning (1), info (2), debug (3) and trace (4)
information to standard output (default to 2).''')
purge.set_defaults(func=self.purge)
parser.error = self._parse_error
return parser

def apply(self, code, silent, store_history, user_expressions, allow_stdin):
options, remaining_code = self.get_magic_and_code(code, False)
parser = self.get_parser()
try:
args = parser.parse_args(options.split())
except SystemExit:
return
if args.config:
from sos.utils import load_cfg_files
load_cfg_files(args.config)

def status(self, args):
self.sos_kernel.warn(args)
return
from sos.hosts import Host
host = Host(args.queue)
result = host._task_engine.query_tasks(
Expand All @@ -1934,6 +2022,32 @@ def apply(self, code, silent, store_history, user_expressions, allow_stdin):
status = result.split(
'>Status<', 1)[-1].split('</div', 1)[0].split('>')[-1]
host._task_engine.update_task_status(args.task, status)


def resume(self, args):
self.sos_kernel.warn(args)
return

def kill(self, args):
self.sos_kernel.warn(args)
return

def purge(self, args):
self.sos_kernel.warn(args)
return

def apply(self, code, silent, store_history, user_expressions, allow_stdin):
options, remaining_code = self.get_magic_and_code(code, False)
parser = self.get_parser()
try:
args = parser.parse_args(options.split())
except SystemExit:
return
if args.config:
from sos.utils import load_cfg_files
load_cfg_files(args.config)

args.func(args)
return self.sos_kernel._do_execute(remaining_code, silent, store_history, user_expressions, allow_stdin)


Expand Down Expand Up @@ -2236,8 +2350,7 @@ class SoS_Magics(object):
Shutdown_Magic,
SoSRun_Magic,
SoSSave_Magic,
TaskInfo_Magic,
Tasks_Magic,
Task_Magic,
Toc_Magic,
Sandbox_Magic,
Use_Magic,
Expand Down

0 comments on commit 8a003d4

Please sign in to comment.