Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor lookup_chronos_jobs #24

Merged
merged 9 commits into from
Nov 10, 2015
13 changes: 8 additions & 5 deletions paasta_itests/steps/chronos_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ def chronos_check_job_state(context, old_or_new_job, disabled):
job_id = context.old_chronos_job_name
else:
job_id = context.chronos_job_name
(service, instance, git_hash, config_hash) = chronos_tools.decompose_job_id(job_id)
jobs = chronos_tools.lookup_chronos_jobs(
job_id,
context.chronos_client,
max_expected=1,
include_disabled=desired_disabled
service=service,
instance=instance,
git_hash=git_hash,
config_hash=config_hash,
client=context.chronos_client,
include_disabled=desired_disabled,
)
assert jobs != []
assert len(jobs) == 1
for job in jobs:
assert job['disabled'] == desired_disabled
10 changes: 2 additions & 8 deletions paasta_itests/steps/setup_chronos_job_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,9 @@ def old_jobs_leftover(context, job_count):

@then(u'there should be {job_count} enabled jobs')
def should_be_enabled_jobs(context, job_count):
search_pattern = chronos_tools.compose_job_id(
enabled_jobs = chronos_tools.lookup_chronos_jobs(
service=fake_service_name,
instance=fake_instance_name,
)
enabled_jobs = chronos_tools.lookup_chronos_jobs(
pattern=search_pattern,
client=context.chronos_client,
include_disabled=False,
)
Expand All @@ -127,12 +124,9 @@ def should_be_enabled_jobs(context, job_count):

@then(u'there should be {job_count} disabled jobs')
def should_be_disabled_jobs(context, job_count):
search_pattern = chronos_tools.compose_job_id(
all_related_jobs = chronos_tools.lookup_chronos_jobs(
service=fake_service_name,
instance=fake_instance_name,
)
all_related_jobs = chronos_tools.lookup_chronos_jobs(
pattern=search_pattern,
client=context.chronos_client,
include_disabled=True,
)
Expand Down
24 changes: 14 additions & 10 deletions paasta_tools/check_chronos_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,23 @@ def sensu_event_for_last_run_state(state):
return pysensu_yelp.Status.OK


def build_service_job_mapping(configured_jobs, running_jobs):
def build_service_job_mapping(client, configured_jobs):
"""
Create a dict of {(service, instance): [(chronos job, lastrunstate)]}
where the chronos job is any with a matching (service, instance)
in its name and disabled == false
:param client: A Chronos client used for getting the list of running jobs
:param configured_jobs: A list of jobs configured in Paasta, i.e. jobs we
expect to be able to find
:returns: A dict of {(service, instance): [(chronos job, lastrunstate)]}
where the chronos job is any with a matching (service, instance) in its
name and disabled == False
"""
service_job_mapping = {}
for job in configured_jobs:
# find all those jobs in the client.list belonging to each service
matching_jobs = chronos_tools.match_job_names_to_service_instance(job[0], job[1], running_jobs)
# find all the jobs belonging to each service
matching_jobs = chronos_tools.lookup_chronos_jobs(
service=job[0],
instance=job[1],
client=client,
)
# filter the enabled jobs
enabled = chronos_tools.filter_enabled_jobs(matching_jobs)
# get the last run state for the job
Expand Down Expand Up @@ -143,10 +150,7 @@ def main(args):
# get those jobs listed in configs
configured_jobs = chronos_tools.get_chronos_jobs_for_cluster(soa_dir=args.soa_dir)

# get the running jobs
running_jobs = client.list()

service_job_mapping = build_service_job_mapping(configured_jobs, running_jobs)
service_job_mapping = build_service_job_mapping(client, configured_jobs)
for service_instance, job_state_pairs in service_job_mapping.items():
service, instance = service_instance[0], service_instance[1]
sensu_output, sensu_status = sensu_message_status_for_jobs(service, instance, job_state_pairs)
Expand Down
60 changes: 27 additions & 33 deletions paasta_tools/chronos_serviceinit.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,6 @@ def restart_chronos_job(service, instance, job_id, client, cluster, matching_job
start_chronos_job(service, instance, job_id, client, cluster, job_config, emergency)


def get_matching_jobs(client, job_id, all_tags):
"""Use Chronos client `client` to get a list of configured Chronos jobs
related to `job_id`, the full name of the job as calculated by
create_complete_config().

If all_tags is False, fetch only the exact job specified by job_id.

If all_tags is True, fetch all jobs including those with different git and
config hashes (i.e. older versions of jobs associated with a given service
+ instance).

Returns a list of dicts, each representing the configuration of a Chronos
job.
"""
matching_jobs_pattern = r"^UNINITIALIZED PATTERN$"
if all_tags:
(service, instance, _, __) = chronos_tools.decompose_job_id(job_id)
# We add SPACER to the end as an anchor to prevent catching
# "my_service my_job_extra" when looking for "my_service my_job".
matching_jobs_pattern = r"^%s%s" % (chronos_tools.compose_job_id(service, instance), chronos_tools.SPACER)
else:
matching_jobs_pattern = r"^%s" % job_id
matching_jobs = chronos_tools.lookup_chronos_jobs(matching_jobs_pattern, client, include_disabled=True)
return matching_jobs


def get_short_task_id(task_id):
"""Return just the Chronos-generated timestamp section of a Mesos task id."""
return task_id.split(chronos_tools.MESOS_TASK_SPACER)[1]
Expand Down Expand Up @@ -252,10 +226,20 @@ def perform_command(command, service, instance, cluster, verbose, soa_dir):
if command == "start":
start_chronos_job(service, instance, job_id, client, cluster, complete_job_config, emergency=True)
elif command == "stop":
matching_jobs = get_matching_jobs(client, job_id, all_tags=True)
matching_jobs = chronos_tools.lookup_chronos_jobs(
service=service,
instance=instance,
client=client,
include_disabled=True,
)
stop_chronos_job(service, instance, client, cluster, matching_jobs, emergency=True)
elif command == "restart":
matching_jobs = get_matching_jobs(client, job_id, all_tags=True)
matching_jobs = chronos_tools.lookup_chronos_jobs(
service=service,
instance=instance,
client=client,
include_disabled=True,
)
restart_chronos_job(
service,
instance,
Expand All @@ -269,13 +253,23 @@ def perform_command(command, service, instance, cluster, verbose, soa_dir):
elif command == "status":
# Setting up transparent cache for http API calls
requests_cache.install_cache("paasta_serviceinit", backend="memory")
# Verbose mode may want to display information about previous versions and configurations
all_tags = False
# Verbose mode shows previous versions.
if verbose:
all_tags = True
matching_jobs = get_matching_jobs(client, job_id, all_tags)
git_hash = None
config_hash = None
# Non-verbose shows only the version specified via
# create_complete_config.
else:
(_, __, git_hash, config_hash) = chronos_tools.decompose_job_id(job_id)
matching_jobs = chronos_tools.lookup_chronos_jobs(
service=service,
instance=instance,
git_hash=git_hash,
config_hash=config_hash,
client=client,
include_disabled=True,
)
sorted_matching_jobs = chronos_tools.sort_jobs(matching_jobs)
job_config = chronos_tools.load_chronos_job_config(service, instance, cluster, soa_dir=soa_dir)
job_config = chronos_tools.load_chronos_job_config(
service=service,
instance=instance,
Expand Down
73 changes: 45 additions & 28 deletions paasta_tools/chronos_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from paasta_tools.utils import get_default_branch
from paasta_tools.utils import get_docker_url
from paasta_tools.utils import InstanceConfig
from paasta_tools.utils import InvalidJobNameError
from paasta_tools.utils import load_deployments_json
from paasta_tools.utils import load_system_paasta_config
from paasta_tools.utils import PATH_TO_SYSTEM_PAASTA_CONFIG_DIR
Expand Down Expand Up @@ -545,44 +546,60 @@ def get_key(job):
)


def match_job_names_to_service_instance(service, instance, jobs):
"""Given a list of chronos jobs, return those which are associated with a given service and instance."""
matching = []
for job in jobs:
jobs_service, jobs_instance, _, _ = decompose_job_id(job['name'])
if jobs_service == service and jobs_instance == instance:
matching.append(job)
return matching


def lookup_chronos_jobs(pattern, client, max_expected=None, include_disabled=False):
"""Retrieves Chronos jobs with names that match a specified pattern.
def lookup_chronos_jobs(client, service=None, instance=None, git_hash=None, config_hash=None, include_disabled=False):
"""Discovers Chronos jobs and filters them with ``filter_chronos_jobs()``.

:param pattern: a Python style regular expression that the job name will be matched against
(after being passed to re.compile)
:param client: Chronos client object
:param max_expected: maximum number of results that is expected. If exceeded, raises a ValueError.
If unspecified, defaults to no limit.
:param include_disabled: boolean indicating if disabled jobs should be included in matches
:param service: passed on to ``filter_chronos_jobs()``
:param instance: passed on to ``filter_chronos_jobs()``
:param git_hash: passed on to ``filter_chronos_jobs()``
:param config_hash: passed on to ``filter_chronos_jobs()``
:param include_disabled: passed on to ``filter_chronos_jobs()``
:returns: list of job dicts discovered by ``client`` and filtered by
``filter_chronos_jobs()`` using the other parameters
"""
try:
regexp = re.compile(pattern)
except re.error:
raise ValueError("Invalid regex pattern '%s'" % pattern)
jobs = client.list()
return filter_chronos_jobs(
jobs=jobs,
service=service,
instance=instance,
git_hash=git_hash,
config_hash=config_hash,
include_disabled=include_disabled,
)


def filter_chronos_jobs(jobs, service, instance, git_hash, config_hash, include_disabled):
"""Filters a list of Chronos jobs based on several criteria.

:param jobs: a list of jobs, as calculated in ``lookup_chronos_jobs()``
:param service: service we're looking for. If None, don't filter based on this key.
:param instance: instance we're looking for. If None, don't filter based on this key.
:param git_hash: git_hash we're looking for. If None, don't filter based on
this key.
:param config_hash: config_hash we're looking for. If None, don't filter
based on this key.
:param include_disabled: boolean indicating if disabled jobs should be
included in the returned list
:returns: list of job dicts whose name matches the arguments (if any)
provided
"""
matching_jobs = []
for job in jobs:
if regexp.search(job['name']):
try:
(job_service, job_instance, job_git_hash, job_config_hash) = decompose_job_id(job['name'])
except InvalidJobNameError:
continue
if (
(service is None or job_service == service)
and (instance is None or job_instance == instance)
and (git_hash is None or job_git_hash == git_hash)
and (config_hash is None or job_config_hash == config_hash)
):
if job['disabled'] and not include_disabled:
continue
else:
matching_jobs.append(job)

if max_expected and len(matching_jobs) > max_expected:
matching_ids = [job['name'] for job in matching_jobs]
raise ValueError("Found %d jobs for pattern '%s', but max_expected is set to %d (ids: %s)" %
(len(matching_jobs), pattern, max_expected, ', '.join(matching_ids)))

return matching_jobs


Expand Down
7 changes: 5 additions & 2 deletions paasta_tools/setup_chronos_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ def bounce_chronos_job(

def setup_job(service, instance, chronos_job_config, complete_job_config, client, cluster):
job_id = complete_job_config['name']
all_existing_jobs = chronos_tools.match_job_names_to_service_instance(
service=service, instance=instance, jobs=client.list())
all_existing_jobs = chronos_tools.lookup_chronos_jobs(
service=service,
instance=instance,
client=client,
)
# TODO: Sort the jobs in the right order so we delete the least relevant
# This currently depends on implicit behavior that Chronos returns jobs
# "oldest first"
Expand Down
23 changes: 13 additions & 10 deletions tests/test_check_chronos_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,23 @@ def test_sensu_event_for_last_run_state_invalid():
check_chronos_jobs.sensu_event_for_last_run_state(100)


@patch('check_chronos_jobs.chronos_tools.match_job_names_to_service_instance')
@patch('check_chronos_jobs.chronos_tools.filter_enabled_jobs')
@patch('check_chronos_jobs.chronos_tools.get_status_last_run')
def test_build_service_job_mapping(mock_last_run_state, mock_filter_enabled_jobs, mock_match_job_names):
mock_match_job_names.side_effect = [[{}, {}, {}] for x in range(0, 3)]
mock_filter_enabled_jobs.side_effect = [[{}, {}, {}] for x in range(0, 3)]
mock_last_run_state.side_effect = [
@patch('check_chronos_jobs.chronos_tools.lookup_chronos_jobs', autospec=True)
@patch('check_chronos_jobs.chronos_tools.filter_enabled_jobs', autospec=True)
@patch('check_chronos_jobs.chronos_tools.get_status_last_run', autospec=True)
def test_build_service_job_mapping(mock_last_run_state, mock_filter_enabled_jobs, mock_lookup_chronos_jobs):
# iter() is a workaround
# (http://lists.idyll.org/pipermail/testing-in-python/2013-April/005527.html)
# for a bug in mock (http://bugs.python.org/issue17826)
mock_lookup_chronos_jobs.side_effect = iter([[{}, {}, {}] for x in range(0, 3)])
mock_filter_enabled_jobs.side_effect = iter([[{}, {}, {}] for x in range(0, 3)])
mock_last_run_state.side_effect = iter([
('faketimestamp', chronos_tools.LastRunState.Success),
('faketimestamp', chronos_tools.LastRunState.Fail),
('faketimestamp', chronos_tools.LastRunState.NotRun),
] * 3
] * 3)

fake_configured_jobs = [('service1', 'main'), ('service2', 'main'), ('service3', 'main')]
fake_running_jobs = [('service1', 'main'), ('service2', 'main'), ('service3', 'main')]
fake_client = Mock(list=Mock(return_value=[('service1', 'main'), ('service2', 'main'), ('service3', 'main')]))

expected_job_states = [
({}, chronos_tools.LastRunState.Success),
Expand All @@ -111,7 +114,7 @@ def test_build_service_job_mapping(mock_last_run_state, mock_filter_enabled_jobs
('service2', 'main'): expected_job_states,
('service3', 'main'): expected_job_states,
}
assert check_chronos_jobs.build_service_job_mapping(fake_configured_jobs, fake_running_jobs) == expected
assert check_chronos_jobs.build_service_job_mapping(fake_client, fake_configured_jobs) == expected


def test_message_for_status_fail():
Expand Down
18 changes: 0 additions & 18 deletions tests/test_chronos_serviceinit.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,6 @@ def test_stop_chronos_job():
mock_client.delete_tasks.assert_any_call(job['name'])


def test_get_matching_jobs_all_tags_true():
job_id = 'my_service my_instance gityourmom configyourdad'
client = 'unused'
expected_pattern = r'^my_service my_instance ' # Trailing space is important!
with mock.patch('chronos_serviceinit.chronos_tools.lookup_chronos_jobs') as mock_lookup_chronos_jobs:
chronos_serviceinit.get_matching_jobs(client, job_id, all_tags=True)
mock_lookup_chronos_jobs.assert_called_once_with(expected_pattern, client, include_disabled=True)


def test_get_matching_jobs_all_tags_false():
job_id = 'my_service my_instance gityourmom configyourdad'
client = 'unused'
expected_pattern = r'^my_service my_instance gityourmom configyourdad'
with mock.patch('chronos_serviceinit.chronos_tools.lookup_chronos_jobs') as mock_lookup_chronos_jobs:
chronos_serviceinit.get_matching_jobs(client, job_id, all_tags=False)
mock_lookup_chronos_jobs.assert_called_once_with(expected_pattern, client, include_disabled=True)


def test_get_short_task_id():
task_id = 'ct:1111111111111:0:my_service my_instance gityourmom configyourdad:'
assert chronos_serviceinit.get_short_task_id(task_id) == '1111111111111'
Expand Down
Loading