diff --git a/plugins/module_utils/acme/backend_cryptography.py b/plugins/module_utils/acme/backend_cryptography.py index 4e6438a3f..9f7770aea 100644 --- a/plugins/module_utils/acme/backend_cryptography.py +++ b/plugins/module_utils/acme/backend_cryptography.py @@ -11,6 +11,7 @@ import base64 import binascii +import datetime import os import traceback @@ -21,6 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CertificateInformation, CryptoBackend, + _parse_acme_timestamp, ) from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import ( @@ -53,10 +55,16 @@ extract_first_pem, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + parse_name_field, +) + from ansible_collections.community.crypto.plugins.module_utils.time import ( ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, get_now_datetime, - parse_name_field, + UTC, ) CRYPTOGRAPHY_MINIMAL_VERSION = '1.5' @@ -173,6 +181,26 @@ class CryptographyBackend(CryptoBackend): def __init__(self, module): super(CryptographyBackend, self).__init__(module) + def get_now(self): + return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def parse_acme_timestamp(self, timestamp_str): + return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def get_utc_datetime(self, *args, **kwargs): + kwargs_ext = dict(kwargs) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8): + kwargs_ext['tzinfo'] = UTC + result = datetime.datetime(*args, **kwargs_ext) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8): + result = ensure_utc_timezone(result) + return result + def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' Parses an RSA or Elliptic Curve key file in PEM format and returns key_data. @@ -379,7 +407,7 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e)) if now is None: - now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + now = self.get_now() elif CRYPTOGRAPHY_TIMEZONE: now = ensure_utc_timezone(now) return (get_not_valid_after(cert) - now).days diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 78ff0f181..2db282667 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -11,6 +11,7 @@ from collections import namedtuple import abc +import datetime from ansible.module_utils import six @@ -18,6 +19,14 @@ BackendException, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_now_datetime, + remove_timezone, +) + CertificateInformation = namedtuple( 'CertificateInformation', @@ -31,11 +40,42 @@ ) +def _parse_acme_timestamp(timestamp_str, with_timezone): + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'): + # Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491 + try: + result = datetime.datetime.strptime(timestamp_str, format) + except ValueError: + pass + else: + return ensure_utc_timezone(result) if with_timezone else remove_timezone(result) + raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str)) + + @six.add_metaclass(abc.ABCMeta) class CryptoBackend(object): def __init__(self, module): self.module = module + def get_now(self): + return get_now_datetime(with_timezone=False) + + def parse_acme_timestamp(self, timestamp_str): + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + return _parse_acme_timestamp(timestamp_str, with_timezone=False) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=False) + + def get_utc_datetime(self, *args, **kwargs): + result = datetime.datetime(*args, **kwargs) + if 'tzinfo' in kwargs or len(args) >= 8: + result = remove_timezone(result) + return result + @abc.abstractmethod def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 44c72b775..862f5b8fc 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -20,12 +20,12 @@ identify_pem_format, ) -from ansible_collections.community.crypto.plugins.module_utils.time import ( +from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import # These imports are for backwards compatibility - get_now_datetime, # noqa: F401, pylint: disable=unused-import - ensure_utc_timezone, # noqa: F401, pylint: disable=unused-import - convert_relative_to_datetime, # noqa: F401, pylint: disable=unused-import - get_relative_time_option, # noqa: F401, pylint: disable=unused-import + get_now_datetime, + ensure_utc_timezone, + convert_relative_to_datetime, + get_relative_time_option, ) try: diff --git a/plugins/module_utils/time.py b/plugins/module_utils/time.py index a5098eea1..4adc4620e 100644 --- a/plugins/module_utils/time.py +++ b/plugins/module_utils/time.py @@ -8,15 +8,11 @@ __metaclass__ = type -import abc import datetime -import errno -import hashlib -import os import re +import sys -from ansible.module_utils import six -from ansible.module_utils.common.text.converters import to_native, to_bytes +from ansible.module_utils.common.text.converters import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, @@ -56,6 +52,9 @@ def get_now_datetime(with_timezone): def ensure_utc_timezone(timestamp): if timestamp.tzinfo is UTC: return timestamp + if timestamp.tzinfo is None: + # We assume that naive datetime objects use timezone UTC! + return timestamp.replace(tzinfo=UTC) return timestamp.astimezone(UTC) @@ -72,11 +71,19 @@ def add_or_remove_timezone(timestamp, with_timezone): return ensure_utc_timezone(timestamp) if with_timezone else remove_timezone(timestamp) -def get_epoch_seconds(timestamp): - try: +if sys.version_info < (3, 3): + def get_epoch_seconds(timestamp): + epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None) + delta = timestamp - epoch + try: + return delta.total_seconds() + except AttributeError: + # Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from + # https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds + return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6 +else: + def get_epoch_seconds(timestamp): return timestamp.timestamp() - except AttributeError: - return (ensure_utc_timezone(timestamp) - datetime(1970, 1, 1, tzinfo=UTC)).total_seconds() def from_epoch_seconds(timestamp, with_timezone): diff --git a/tests/unit/plugins/module_utils/acme/backend_data.py b/tests/unit/plugins/module_utils/acme/backend_data.py index 31e0ef006..eed1fb3fa 100644 --- a/tests/unit/plugins/module_utils/acme/backend_data.py +++ b/tests/unit/plugins/module_utils/acme/backend_data.py @@ -9,6 +9,7 @@ import base64 import datetime import os +import sys from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CertificateInformation, @@ -107,6 +108,52 @@ def load_fixture(name): ] +TEST_PARSE_ACME_TIMESTAMP = [ + ( + '2024-01-01T00:11:22Z', + dict(year=2024, month=1, day=1, hour=0, minute=11, second=22), + ), + ( + '2024-01-01T00:11:22.123Z', + dict(year=2024, month=1, day=1, hour=0, minute=11, second=22, microsecond=123000), + ), +] + +if sys.version_info >= (3, 5): + TEST_PARSE_ACME_TIMESTAMP.extend([ + ( + '2024-01-01T00:11:22+0100', + dict(year=2023, month=12, day=31, hour=23, minute=11, second=22), + ), + ( + '2024-01-01T00:11:22.123+0100', + dict(year=2023, month=12, day=31, hour=23, minute=11, second=22, microsecond=123000), + ), + ]) + + +TEST_INTERPOLATE_TIMESTAMP = [ + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 0.0, + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + ), + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 0.5, + dict(year=2024, month=1, day=1, hour=0, minute=30, second=0), + ), + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 1.0, + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + ), +] + + class FakeBackend(CryptoBackend): def parse_key(self, key_file=None, key_content=None, passphrase=None): raise BackendException('Not implemented in fake backend') diff --git a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py index c3b713ee6..9186e2430 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py @@ -30,6 +30,8 @@ TEST_CERT, TEST_CERT_DAYS, TEST_CERT_INFO, + TEST_PARSE_ACME_TIMESTAMP, + TEST_INTERPOLATE_TIMESTAMP, ) @@ -92,3 +94,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output, assert cert_info == expected_cert_info cert_info = backend.get_cert_information(cert_content=cert_content) assert cert_info == expected_cert_info + + +def test_now(): + module = MagicMock() + backend = CryptographyBackend(module) + now = backend.get_now() + assert CRYPTOGRAPHY_TIMEZONE == (now.tzinfo is not None) + + +@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP) +def test_parse_acme_timestamp(input, expected): + module = MagicMock() + backend = CryptographyBackend(module) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.parse_acme_timestamp(input) + assert ts_expected == timestamp + + +@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP) +def test_interpolate_timestamp(start, end, percentage, expected): + module = MagicMock() + backend = CryptographyBackend(module) + ts_start = backend.get_utc_datetime(**start) + ts_end = backend.get_utc_datetime(**end) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage) + assert ts_expected == timestamp diff --git a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py index c0a108611..5138a6202 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py @@ -22,6 +22,8 @@ TEST_CERT_OPENSSL_OUTPUT, TEST_CERT_DAYS, TEST_CERT_INFO, + TEST_PARSE_ACME_TIMESTAMP, + TEST_INTERPOLATE_TIMESTAMP, ) @@ -91,3 +93,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output, assert cert_info == expected_cert_info cert_info = backend.get_cert_information(cert_content=cert_content) assert cert_info == expected_cert_info + + +def test_now(): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + now = backend.get_now() + assert now.tzinfo is None + + +@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP) +def test_parse_acme_timestamp(input, expected): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.parse_acme_timestamp(input) + assert ts_expected == timestamp + + +@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP) +def test_interpolate_timestamp(start, end, percentage, expected): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + ts_start = backend.get_utc_datetime(**start) + ts_end = backend.get_utc_datetime(**end) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage) + assert ts_expected == timestamp diff --git a/tests/unit/plugins/module_utils/test_time.py b/tests/unit/plugins/module_utils/test_time.py index 72fd4e5c8..35a83f4e4 100644 --- a/tests/unit/plugins/module_utils/test_time.py +++ b/tests/unit/plugins/module_utils/test_time.py @@ -198,7 +198,6 @@ ] - if sys.version_info >= (3, 5): ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1)) @@ -322,4 +321,3 @@ def test_convert_relative_to_datetime(relative_time_string, with_timezone, now, def test_get_relative_time_option(input_string, input_name, backend, with_timezone, now, expected): output = get_relative_time_option(input_string, input_name, backend=backend, with_timezone=with_timezone, now=now) assert expected == output -