Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

acme_certificate: respect order of CNAME/SAN entries of CSR #725

Merged
merged 1 commit into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelogs/fragments/725-acme_certificate-order.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bugfixes:
- "acme_certificate - respect the order of the CNAME and SAN identifiers that are passed on when creating an ACME order
(https://github.com/ansible-collections/community.crypto/issues/723, https://github.com/ansible-collections/community.crypto/pull/725)."
deprecated_features:
- "acme.backends module utils - from community.crypto on, all implementations of ``CryptoBackend`` must override ``get_ordered_csr_identifiers()``.
The current default implementation, which simply sorts the result of ``get_csr_identifiers()``, will then be removed
(https://github.com/ansible-collections/community.crypto/pull/725)."
34 changes: 27 additions & 7 deletions plugins/module_utils/acme/backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,31 +298,51 @@ def create_mac_key(self, alg, key):
},
}

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.

The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
identifiers = set([])
if csr_content is None:
csr_content = read_file(csr_filename)
else:
csr_content = to_bytes(csr_content)
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)

identifiers = set()
result = []

def add_identifier(identifier):
if identifier in identifiers:
return
identifiers.add(identifier)
result.append(identifier)

for sub in csr.subject:
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
identifiers.add(('dns', sub.value))
add_identifier(('dns', sub.value))
for extension in csr.extensions:
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
for name in extension.value:
if isinstance(name, cryptography.x509.DNSName):
identifiers.add(('dns', name.value))
add_identifier(('dns', name.value))
elif isinstance(name, cryptography.x509.IPAddress):
identifiers.add(('ip', name.value.compressed))
add_identifier(('ip', name.value.compressed))
else:
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
return identifiers
return result

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Expand Down
35 changes: 27 additions & 8 deletions plugins/module_utils/acme/backend_openssl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ def _normalize_ip(ip):
# We do not want to error out on something IPAddress() cannot parse
return ip

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.

The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
filename = csr_filename
data = None
Expand All @@ -241,24 +244,40 @@ def get_csr_identifiers(self, csr_filename=None, csr_content=None):
dummy, out, dummy = self.module.run_command(
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)

identifiers = set([])
identifiers = set()
result = []

def add_identifier(identifier):
if identifier in identifiers:
return
identifiers.add(identifier)
result.append(identifier)

common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
if common_name is not None:
identifiers.add(('dns', common_name.group(1)))
add_identifier(('dns', common_name.group(1)))
subject_alt_names = re.search(
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.lower().startswith("dns:"):
identifiers.add(('dns', san[4:]))
add_identifier(('dns', san[4:]))
elif san.lower().startswith("ip:"):
identifiers.add(('ip', self._normalize_ip(san[3:])))
add_identifier(('ip', self._normalize_ip(san[3:])))
elif san.lower().startswith("ip address:"):
identifiers.add(('ip', self._normalize_ip(san[11:])))
add_identifier(('ip', self._normalize_ip(san[11:])))
else:
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
return identifiers
return result

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Expand Down
17 changes: 17 additions & 0 deletions plugins/module_utils/acme/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ def sign(self, payload64, protected64, key_data):
def create_mac_key(self, alg, key):
'''Create a MAC key.'''

def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.

The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
self.module.deprecate(
"Every backend must override the get_ordered_csr_identifiers() method."
" The default implementation will be removed in 3.0.0 and this method will be marked as `abstractmethod` by then.",
version='3.0.0',
collection_name='community.crypto',
)
return sorted(self.get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

@abc.abstractmethod
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Expand Down
2 changes: 1 addition & 1 deletion plugins/modules/acme_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def __init__(self, module, backend):
raise ModuleFailException("CSR %s not found" % (self.csr))

# Extract list of identifiers from CSR
self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)

def is_first_step(self):
'''
Expand Down