Skip to content

Commit

Permalink
Add and use CryptoBackend.get_ordered_csr_identifiers().
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein committed Apr 10, 2024
1 parent 7e33398 commit ad7f8cb
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 16 deletions.
7 changes: 7 additions & 0 deletions changelogs/fragments/725-acme_certificate-order.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bugfixes:
- "acme_certificate - respect the order of the CNAME and SAN identifiers that are passed on when creating an ACME order
(https://github.com/ansible-collections/community.crypto/issues/723, https://github.com/ansible-collections/community.crypto/pull/725)."
deprecated_features:
- "acme.backends module utils - from community.crypto on, all implementations of ``CryptoBackend`` must override ``get_ordered_csr_identifiers()``.
The current default implementation, which simply sorts the result of ``get_csr_identifiers()``, will then be removed
(https://github.com/ansible-collections/community.crypto/pull/725)."
34 changes: 27 additions & 7 deletions plugins/module_utils/acme/backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,31 +298,51 @@ def create_mac_key(self, alg, key):
},
}

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
identifiers = set([])
if csr_content is None:
csr_content = read_file(csr_filename)
else:
csr_content = to_bytes(csr_content)
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)

identifiers = set()
result = []

def add_identifier(identifier):
if identifier in identifiers:
return
identifiers.add(identifier)
result.append(identifier)

for sub in csr.subject:
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
identifiers.add(('dns', sub.value))
add_identifier(('dns', sub.value))
for extension in csr.extensions:
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
for name in extension.value:
if isinstance(name, cryptography.x509.DNSName):
identifiers.add(('dns', name.value))
add_identifier(('dns', name.value))
elif isinstance(name, cryptography.x509.IPAddress):
identifiers.add(('ip', name.value.compressed))
add_identifier(('ip', name.value.compressed))
else:
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
return identifiers
return result

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Expand Down
35 changes: 27 additions & 8 deletions plugins/module_utils/acme/backend_openssl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ def _normalize_ip(ip):
# We do not want to error out on something IPAddress() cannot parse
return ip

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
filename = csr_filename
data = None
Expand All @@ -241,24 +244,40 @@ def get_csr_identifiers(self, csr_filename=None, csr_content=None):
dummy, out, dummy = self.module.run_command(
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)

identifiers = set([])
identifiers = set()
result = []

def add_identifier(identifier):
if identifier in identifiers:
return
identifiers.add(identifier)
result.append(identifier)

common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
if common_name is not None:
identifiers.add(('dns', common_name.group(1)))
add_identifier(('dns', common_name.group(1)))
subject_alt_names = re.search(
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.lower().startswith("dns:"):
identifiers.add(('dns', san[4:]))
add_identifier(('dns', san[4:]))
elif san.lower().startswith("ip:"):
identifiers.add(('ip', self._normalize_ip(san[3:])))
add_identifier(('ip', self._normalize_ip(san[3:])))
elif san.lower().startswith("ip address:"):
identifiers.add(('ip', self._normalize_ip(san[11:])))
add_identifier(('ip', self._normalize_ip(san[11:])))
else:
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
return identifiers
return result

def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
'''
Expand Down
17 changes: 17 additions & 0 deletions plugins/module_utils/acme/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ def sign(self, payload64, protected64, key_data):
def create_mac_key(self, alg, key):
'''Create a MAC key.'''

def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Return a list of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
The list is deduplicated, and if a CNAME is present, it will be returned
as the first element in the result.
'''
module.deprecate(
"Every backend must override the get_ordered_csr_identifiers() method."
" The default implementation will be removed in 3.0.0 and this method will be marked as `abstractmethod` by then.",
version='3.0.0',
collection_name='community.crypto',
)
return sorted(self.get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))

@abc.abstractmethod
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
'''
Expand Down
2 changes: 1 addition & 1 deletion plugins/modules/acme_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def __init__(self, module, backend):
raise ModuleFailException("CSR %s not found" % (self.csr))

# Extract list of identifiers from CSR
self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)

def is_first_step(self):
'''
Expand Down

0 comments on commit ad7f8cb

Please sign in to comment.