Skip to content

Commit

Permalink
Add time helpers to ACME backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein committed May 2, 2024
1 parent 4bf6637 commit c75881f
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 19 deletions.
32 changes: 30 additions & 2 deletions plugins/module_utils/acme/backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import base64
import binascii
import datetime
import os
import traceback

Expand All @@ -21,6 +22,7 @@
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
_parse_acme_timestamp,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
Expand Down Expand Up @@ -53,10 +55,16 @@
extract_first_pem,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
parse_name_field,
UTC,
)

CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'
Expand Down Expand Up @@ -173,6 +181,26 @@ class CryptographyBackend(CryptoBackend):
def __init__(self, module):
super(CryptographyBackend, self).__init__(module)

def get_now(self):
return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)

def parse_acme_timestamp(self, timestamp_str):
return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE)

def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE)

def get_utc_datetime(self, *args, **kwargs):
kwargs_ext = dict(kwargs)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8):
kwargs_ext['tzinfo'] = UTC
result = datetime.datetime(*args, **kwargs_ext)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8):
result = ensure_utc_timezone(result)
return result

def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
Expand Down Expand Up @@ -379,7 +407,7 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))

if now is None:
now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
now = self.get_now()
elif CRYPTOGRAPHY_TIMEZONE:
now = ensure_utc_timezone(now)
return (get_not_valid_after(cert) - now).days
Expand Down
40 changes: 40 additions & 0 deletions plugins/module_utils/acme/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@

from collections import namedtuple
import abc
import datetime

from ansible.module_utils import six

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
remove_timezone,
)


CertificateInformation = namedtuple(
'CertificateInformation',
Expand All @@ -31,11 +40,42 @@
)


def _parse_acme_timestamp(timestamp_str, with_timezone):
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'):
# Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491
try:
result = datetime.datetime.strptime(timestamp_str, format)
except ValueError:
pass
else:
return ensure_utc_timezone(result) if with_timezone else remove_timezone(result)
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))


@six.add_metaclass(abc.ABCMeta)
class CryptoBackend(object):
def __init__(self, module):
self.module = module

def get_now(self):
return get_now_datetime(with_timezone=False)

def parse_acme_timestamp(self, timestamp_str):
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
return _parse_acme_timestamp(timestamp_str, with_timezone=False)

def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=False)

def get_utc_datetime(self, *args, **kwargs):
result = datetime.datetime(*args, **kwargs)
if 'tzinfo' in kwargs or len(args) >= 8:
result = remove_timezone(result)
return result

@abc.abstractmethod
def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
Expand Down
10 changes: 5 additions & 5 deletions plugins/module_utils/crypto/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
identify_pem_format,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import
# These imports are for backwards compatibility
get_now_datetime, # noqa: F401, pylint: disable=unused-import
ensure_utc_timezone, # noqa: F401, pylint: disable=unused-import
convert_relative_to_datetime, # noqa: F401, pylint: disable=unused-import
get_relative_time_option, # noqa: F401, pylint: disable=unused-import
get_now_datetime,
ensure_utc_timezone,
convert_relative_to_datetime,
get_relative_time_option,
)

try:
Expand Down
27 changes: 17 additions & 10 deletions plugins/module_utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
__metaclass__ = type


import abc
import datetime
import errno
import hashlib
import os
import re
import sys

from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native, to_bytes
from ansible.module_utils.common.text.converters import to_native

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
Expand Down Expand Up @@ -56,6 +52,9 @@ def get_now_datetime(with_timezone):
def ensure_utc_timezone(timestamp):
if timestamp.tzinfo is UTC:
return timestamp
if timestamp.tzinfo is None:
# We assume that naive datetime objects use timezone UTC!
return timestamp.replace(tzinfo=UTC)
return timestamp.astimezone(UTC)


Expand All @@ -72,11 +71,19 @@ def add_or_remove_timezone(timestamp, with_timezone):
return ensure_utc_timezone(timestamp) if with_timezone else remove_timezone(timestamp)


def get_epoch_seconds(timestamp):
try:
if sys.version_info < (3, 3):
def get_epoch_seconds(timestamp):
epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None)
delta = timestamp - epoch
try:
return delta.total_seconds()
except AttributeError:
# Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from
# https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds
return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6
else:
def get_epoch_seconds(timestamp):
return timestamp.timestamp()
except AttributeError:
return (ensure_utc_timezone(timestamp) - datetime(1970, 1, 1, tzinfo=UTC)).total_seconds()


def from_epoch_seconds(timestamp, with_timezone):
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/plugins/module_utils/acme/backend_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import base64
import datetime
import os
import sys

from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
Expand Down Expand Up @@ -107,6 +108,52 @@ def load_fixture(name):
]


TEST_PARSE_ACME_TIMESTAMP = [
(
'2024-01-01T00:11:22Z',
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22),
),
(
'2024-01-01T00:11:22.123Z',
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22, microsecond=123000),
),
]

if sys.version_info >= (3, 5):
TEST_PARSE_ACME_TIMESTAMP.extend([
(
'2024-01-01T00:11:22+0100',
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22),
),
(
'2024-01-01T00:11:22.123+0100',
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22, microsecond=123000),
),
])


TEST_INTERPOLATE_TIMESTAMP = [
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
0.0,
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
),
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
0.5,
dict(year=2024, month=1, day=1, hour=0, minute=30, second=0),
),
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
1.0,
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
),
]


class FakeBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None):
raise BackendException('Not implemented in fake backend')
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/plugins/module_utils/acme/test_backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
TEST_CERT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
TEST_PARSE_ACME_TIMESTAMP,
TEST_INTERPOLATE_TIMESTAMP,
)


Expand Down Expand Up @@ -92,3 +94,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output,
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info


def test_now():
module = MagicMock()
backend = CryptographyBackend(module)
now = backend.get_now()
assert CRYPTOGRAPHY_TIMEZONE == (now.tzinfo is not None)


@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
def test_parse_acme_timestamp(input, expected):
module = MagicMock()
backend = CryptographyBackend(module)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.parse_acme_timestamp(input)
assert ts_expected == timestamp


@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
def test_interpolate_timestamp(start, end, percentage, expected):
module = MagicMock()
backend = CryptographyBackend(module)
ts_start = backend.get_utc_datetime(**start)
ts_end = backend.get_utc_datetime(**end)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
assert ts_expected == timestamp
29 changes: 29 additions & 0 deletions tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
TEST_CERT_OPENSSL_OUTPUT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
TEST_PARSE_ACME_TIMESTAMP,
TEST_INTERPOLATE_TIMESTAMP,
)


Expand Down Expand Up @@ -91,3 +93,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output,
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info


def test_now():
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
now = backend.get_now()
assert now.tzinfo is None


@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
def test_parse_acme_timestamp(input, expected):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.parse_acme_timestamp(input)
assert ts_expected == timestamp


@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
def test_interpolate_timestamp(start, end, percentage, expected):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
ts_start = backend.get_utc_datetime(**start)
ts_end = backend.get_utc_datetime(**end)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
assert ts_expected == timestamp
2 changes: 0 additions & 2 deletions tests/unit/plugins/module_utils/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@
]



if sys.version_info >= (3, 5):
ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1))

Expand Down Expand Up @@ -322,4 +321,3 @@ def test_convert_relative_to_datetime(relative_time_string, with_timezone, now,
def test_get_relative_time_option(input_string, input_name, backend, with_timezone, now, expected):
output = get_relative_time_option(input_string, input_name, backend=backend, with_timezone=with_timezone, now=now)
assert expected == output

0 comments on commit c75881f

Please sign in to comment.