From ad7f8cb8f210b8cf4b9b82970d91f69bc9b90cc9 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Wed, 10 Apr 2024 22:53:12 +0200 Subject: [PATCH] Add and use CryptoBackend.get_ordered_csr_identifiers(). --- .../fragments/725-acme_certificate-order.yml | 7 ++++ .../module_utils/acme/backend_cryptography.py | 34 ++++++++++++++---- .../module_utils/acme/backend_openssl_cli.py | 35 ++++++++++++++----- plugins/module_utils/acme/backends.py | 17 +++++++++ plugins/modules/acme_certificate.py | 2 +- 5 files changed, 79 insertions(+), 16 deletions(-) create mode 100644 changelogs/fragments/725-acme_certificate-order.yml diff --git a/changelogs/fragments/725-acme_certificate-order.yml b/changelogs/fragments/725-acme_certificate-order.yml new file mode 100644 index 000000000..55a9b489b --- /dev/null +++ b/changelogs/fragments/725-acme_certificate-order.yml @@ -0,0 +1,7 @@ +bugfixes: + - "acme_certificate - respect the order of the CNAME and SAN identifiers that are passed on when creating an ACME order + (https://github.com/ansible-collections/community.crypto/issues/723, https://github.com/ansible-collections/community.crypto/pull/725)." +deprecated_features: + - "acme.backends module utils - from community.crypto on, all implementations of ``CryptoBackend`` must override ``get_ordered_csr_identifiers()``. + The current default implementation, which simply sorts the result of ``get_csr_identifiers()``, will then be removed + (https://github.com/ansible-collections/community.crypto/pull/725)." diff --git a/plugins/module_utils/acme/backend_cryptography.py b/plugins/module_utils/acme/backend_cryptography.py index 2e388980a..5513b78a8 100644 --- a/plugins/module_utils/acme/backend_cryptography.py +++ b/plugins/module_utils/acme/backend_cryptography.py @@ -298,31 +298,51 @@ def create_mac_key(self, alg, key): }, } - def get_csr_identifiers(self, csr_filename=None, csr_content=None): + def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None): ''' - Return a set of requested identifiers (CN and SANs) for the CSR. + Return a list of requested identifiers (CN and SANs) for the CSR. Each identifier is a pair (type, identifier), where type is either 'dns' or 'ip'. + + The list is deduplicated, and if a CNAME is present, it will be returned + as the first element in the result. ''' - identifiers = set([]) if csr_content is None: csr_content = read_file(csr_filename) else: csr_content = to_bytes(csr_content) csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend) + + identifiers = set() + result = [] + + def add_identifier(identifier): + if identifier in identifiers: + return + identifiers.add(identifier) + result.append(identifier) + for sub in csr.subject: if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME: - identifiers.add(('dns', sub.value)) + add_identifier(('dns', sub.value)) for extension in csr.extensions: if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME: for name in extension.value: if isinstance(name, cryptography.x509.DNSName): - identifiers.add(('dns', name.value)) + add_identifier(('dns', name.value)) elif isinstance(name, cryptography.x509.IPAddress): - identifiers.add(('ip', name.value.compressed)) + add_identifier(('ip', name.value.compressed)) else: raise BackendException('Found unsupported SAN identifier {0}'.format(name)) - return identifiers + return result + + def get_csr_identifiers(self, csr_filename=None, csr_content=None): + ''' + Return a set of requested identifiers (CN and SANs) for the CSR. + Each identifier is a pair (type, identifier), where type is either + 'dns' or 'ip'. + ''' + return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content)) def get_cert_days(self, cert_filename=None, cert_content=None, now=None): ''' diff --git a/plugins/module_utils/acme/backend_openssl_cli.py b/plugins/module_utils/acme/backend_openssl_cli.py index dabcbdb3b..9a1ed1f5a 100644 --- a/plugins/module_utils/acme/backend_openssl_cli.py +++ b/plugins/module_utils/acme/backend_openssl_cli.py @@ -225,11 +225,14 @@ def _normalize_ip(ip): # We do not want to error out on something IPAddress() cannot parse return ip - def get_csr_identifiers(self, csr_filename=None, csr_content=None): + def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None): ''' - Return a set of requested identifiers (CN and SANs) for the CSR. + Return a list of requested identifiers (CN and SANs) for the CSR. Each identifier is a pair (type, identifier), where type is either 'dns' or 'ip'. + + The list is deduplicated, and if a CNAME is present, it will be returned + as the first element in the result. ''' filename = csr_filename data = None @@ -241,24 +244,40 @@ def get_csr_identifiers(self, csr_filename=None, csr_content=None): dummy, out, dummy = self.module.run_command( openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) - identifiers = set([]) + identifiers = set() + result = [] + + def add_identifier(identifier): + if identifier in identifiers: + return + identifiers.add(identifier) + result.append(identifier) + common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict')) if common_name is not None: - identifiers.add(('dns', common_name.group(1))) + add_identifier(('dns', common_name.group(1))) subject_alt_names = re.search( r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) if subject_alt_names is not None: for san in subject_alt_names.group(1).split(", "): if san.lower().startswith("dns:"): - identifiers.add(('dns', san[4:])) + add_identifier(('dns', san[4:])) elif san.lower().startswith("ip:"): - identifiers.add(('ip', self._normalize_ip(san[3:]))) + add_identifier(('ip', self._normalize_ip(san[3:]))) elif san.lower().startswith("ip address:"): - identifiers.add(('ip', self._normalize_ip(san[11:]))) + add_identifier(('ip', self._normalize_ip(san[11:]))) else: raise BackendException('Found unsupported SAN identifier "{0}"'.format(san)) - return identifiers + return result + + def get_csr_identifiers(self, csr_filename=None, csr_content=None): + ''' + Return a set of requested identifiers (CN and SANs) for the CSR. + Each identifier is a pair (type, identifier), where type is either + 'dns' or 'ip'. + ''' + return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content)) def get_cert_days(self, cert_filename=None, cert_content=None, now=None): ''' diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 5c48e1a74..3c1ee5d2d 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -34,6 +34,23 @@ def sign(self, payload64, protected64, key_data): def create_mac_key(self, alg, key): '''Create a MAC key.''' + def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None): + ''' + Return a list of requested identifiers (CN and SANs) for the CSR. + Each identifier is a pair (type, identifier), where type is either + 'dns' or 'ip'. + + The list is deduplicated, and if a CNAME is present, it will be returned + as the first element in the result. + ''' + module.deprecate( + "Every backend must override the get_ordered_csr_identifiers() method." + " The default implementation will be removed in 3.0.0 and this method will be marked as `abstractmethod` by then.", + version='3.0.0', + collection_name='community.crypto', + ) + return sorted(self.get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content)) + @abc.abstractmethod def get_csr_identifiers(self, csr_filename=None, csr_content=None): ''' diff --git a/plugins/modules/acme_certificate.py b/plugins/modules/acme_certificate.py index 9c0b349c4..21a6d6ae9 100644 --- a/plugins/modules/acme_certificate.py +++ b/plugins/modules/acme_certificate.py @@ -661,7 +661,7 @@ def __init__(self, module, backend): raise ModuleFailException("CSR %s not found" % (self.csr)) # Extract list of identifiers from CSR - self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content) + self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content) def is_first_step(self): '''