Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobs commands #1497

Merged
merged 6 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- High priority for sendmail tasks [#1484](https://github.com/opendatateam/udata/pull/1484)
- Improve tasks/jobs queues routing [#1487](https://github.com/opendatateam/udata/pull/1487)
- Add security.send_confirmation template [#1475](https://github.com/opendatateam/udata/pull/1475)
- Add the `udata schedule|unschedule|scheduled` commands [#1497](https://github.com/opendatateam/udata/pull/1497)

## 1.2.11 (2018-02-05)

Expand Down
51 changes: 41 additions & 10 deletions docs/administrative-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ You can list available jobs with:

```shell
$ udata job list
-> log-test
-> purge-organizations
-> purge-datasets
-> bump-metrics
-> purge-reuses
-> error-test
-> harvest
-> send-frequency-reminder
-> crawl-resources
-> count-tags
log-test
purge-organizations
purge-datasets
bump-metrics
purge-reuses
error-test
harvest
send-frequency-reminder
crawl-resources
count-tags
```

You can launch a job with:
Expand All @@ -114,6 +114,37 @@ $ udata job run job-name arg1 arg2 key1=value key2=value
Most of the time, you won't need it because there will be a dedicated command
to perform the task you need.

You can also schedule or unschedule jobs (and list scheduled jobs):

```shell
$ udata job scheduled
# No scheduled jobs
$ udata job schedule "0 * * * *" count-tags
➢ Scheduled Job count-tags with the following crontab: 0 * * * *
$ udata job scheduled
Count tags: count-tags ↦ 0 * * * *
# Same command to reschedule
$ udata job schedule "1 * * * *" count-tags
➢ Scheduled Job count-tags with the following crontab: 1 * * * *
$ udata job scheduled
Count tags: count-tags ↦ 1 * * * *
$ udata job unschedule count-tags
➢ Unscheduled Job count-tags with the following crontab: 0 * * * *
$ udata job scheduled
# No scheduled jobs
```

Because a job can be scheduled multiple times with different parameters,
you need to provide the same parameters to unschedule:

```shell
$ udata job schedule my-job "0 * * * *" arg key=value
➢ Scheduled Job my-job(arg, key=value) with the following crontab: 0 * * * *
$ udata job unschedule my-job
✘ No scheduled job match Job my-job
$ udata job unschedule my-job arg key=value
➢ Unscheduled Job my-job(arg, key=value) with the following crontab: 0 * * * *
```

## Reindexing data

Expand Down
105 changes: 99 additions & 6 deletions udata/core/jobs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@

import click

from udata.commands import cli, exit_with_error
from udata.commands import cli, exit_with_error, echo, white
from udata.tasks import schedulables, celery

from .models import PeriodicTask

log = logging.getLogger(__name__)

SCHEDULE_LINE = '{name}: {label} ↦ {schedule}'


def job_label(name, args, kwargs):
label = name
params = args[:]
params += ['='.join((k, v)) for k, v in sorted(kwargs.items())]
if params:
label += '(' + ', '.join(params) + ')'
return label


@cli.group('job')
def grp():
Expand Down Expand Up @@ -38,9 +51,7 @@ def run(name, params, delay):
if name not in celery.tasks:
exit_with_error('Job %s not found', name)
job = celery.tasks[name]
label = name
if params:
label += '(' + ', '.join(params) + ')'
label = job_label(name, args, kwargs)
if delay:
log.info('Sending job %s', label)
job.delay(*args, **kwargs)
Expand All @@ -54,5 +65,87 @@ def run(name, params, delay):
@grp.command()
def list():
'''List all availables jobs'''
for job in schedulables():
log.info(job.name)
for job in sorted(schedulables()):
echo(job.name)


@grp.command()
@click.argument('cron', metavar='<cron>')
@click.argument('name', metavar='<name>')
@click.argument('params', nargs=-1, metavar='<arg key=value ...>')
def schedule(cron, name, params):
'''
Schedule the job <name> to run periodically given the <cron> expression.

Jobs args and kwargs are given as parameters without dashes.

Ex:
udata job schedule my-job "* * 0 * *" arg1 arg2 key1=value key2=value
'''
if name not in celery.tasks:
exit_with_error('Job %s not found', name)

args = [p for p in params if '=' not in p]
kwargs = dict(p.split('=') for p in params if '=' in p)
label = 'Job {0}'.format(job_label(name, args, kwargs))

try:
task = PeriodicTask.objects.get(task=name, args=args, kwargs=kwargs)
task.modify(crontab=PeriodicTask.Crontab.parse(cron))
except PeriodicTask.DoesNotExist:
task = PeriodicTask.objects.create(
task=name,
name=label,
description='Periodic {0} job'.format(name),
enabled=True,
args=args,
kwargs=kwargs,
crontab=PeriodicTask.Crontab.parse(cron),
)

msg = 'Scheduled {label} with the following crontab: {cron}'
log.info(msg.format(label=label, cron=task.schedule_display))


@grp.command()
@click.argument('name', metavar='<name>')
@click.argument('params', nargs=-1, metavar='<arg key=value ...>')
def unschedule(name, params):
'''
Unschedule the job <name> with the given parameters.

Jobs args and kwargs are given as parameters without dashes.

Ex:
udata job unschedule my-job arg1 arg2 key1=value key2=value
'''
if name not in celery.tasks:
exit_with_error('Job %s not found', name)

args = [p for p in params if '=' not in p]
kwargs = dict(p.split('=') for p in params if '=' in p)
label = 'Job {0}'.format(job_label(name, args, kwargs))

try:
task = PeriodicTask.objects.get(task=name, args=args, kwargs=kwargs)
except PeriodicTask.DoesNotExist:
exit_with_error('No scheduled job match {0}'.format(label))

task.delete()
msg = 'Unscheduled {label} with the following crontab: {cron}'
log.info(msg.format(label=label, cron=task.schedule_display))


@grp.command()
def scheduled():
'''
List scheduled jobs.
'''
for job in sorted(schedulables()):
for task in PeriodicTask.objects(task=job.name):
label = job_label(task.task, task.args, task.kwargs)
echo(SCHEDULE_LINE.format(
name=white(task.name.encode('utf8')),
label=label,
schedule=task.schedule_display
).encode('utf8'))
11 changes: 11 additions & 0 deletions udata/core/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ class Crontab(BasePeriodicTask.Crontab):
def __unicode__(self):
return CRON.format(**self._data)

@classmethod
def parse(cls, cron):
m, h, d, M, W = cron.split()
return cls(
minute=m,
hour=h,
day_of_month=d,
month_of_year=M,
day_of_week=W,
)

@property
def schedule_display(self):
if self.interval:
Expand Down
5 changes: 0 additions & 5 deletions udata/harvest/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ def backends():
log.info('%s (%s)', backend.name, backend.display_name or backend.name)


@grp.command()
def jobs():
'''List started harvest jobs'''


@grp.command()
@click.argument('identifier')
def launch(identifier):
Expand Down
4 changes: 2 additions & 2 deletions udata/tests/cli/test_db_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def test_unrecord_with_single_parameter_without_extension(cli, migrations):

def test_unrecord_without_parameters(cli, migrations):
'''Should display help without errors'''
result = cli('db unrecord')
result = cli('db unrecord', check=False)
assert result.exit_code != 0
assert migrations.count() == 1

def test_unrecord_with_too_many_parameters(cli, migrations):
'''Should display help without errors'''
result = cli('db unrecord udata test.js too many')
result = cli('db unrecord udata test.js too many', check=False)
assert result.exit_code != 0
assert migrations.count() == 1
6 changes: 6 additions & 0 deletions udata/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,9 @@ def full_url(*args, **kwargs):
def data_path(filename):
'''Get a test data path'''
return os.path.join(os.path.dirname(__file__), 'data', filename)


def assert_command_ok(result):
__tracebackhide__ = True
msg = 'Command failed with exit code {0.exit_code} and output:\n{0.output}'
assert result.exit_code == 0, msg.format(result)
12 changes: 8 additions & 4 deletions udata/tests/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import unicode_literals

import pytest
import shlex
import sys

from contextlib import contextmanager
Expand All @@ -18,7 +19,7 @@
from udata.models import db
from udata.search import es

from .helpers import assert200
from .helpers import assert200, assert_command_ok


class TestClient(FlaskClient):
Expand Down Expand Up @@ -223,16 +224,19 @@ def autoindex(app, clean_db):
@pytest.fixture(name='cli')
def cli_fixture(mocker, app):

def mock_runner(*args):
def mock_runner(*args, **kwargs):
from click.testing import CliRunner
from udata.commands import cli

if len(args) == 1 and ' ' in args[0]:
args = args[0].split()
args = shlex.split(args[0])
runner = CliRunner()
# Avoid instanciating another app and reuse the app fixture
with mocker.patch.object(cli, 'create_app', return_value=app):
return runner.invoke(cli, args, catch_exceptions=False)
result = runner.invoke(cli, args, catch_exceptions=False)
if kwargs.get('check', True):
assert_command_ok(result)
return result

return mock_runner

Expand Down
Loading