Skip to content

Commit

Permalink
Add x509_certificate_convert module. (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein authored Apr 18, 2024
1 parent ae548de commit 27a9ff1
Show file tree
Hide file tree
Showing 13 changed files with 680 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ If you use the Ansible package and do not update collections independently, use
- openssl_signature_info module
- openssl_signature module
- split_pem filter
- x509_certificate_convert module
- x509_certificate_info module and filter
- x509_certificate_pipe module
- x509_certificate module
Expand Down
29 changes: 29 additions & 0 deletions plugins/module_utils/crypto/pem.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


PEM_START = '-----BEGIN '
PEM_END_START = '-----END '
PEM_END = '-----'
PKCS8_PRIVATEKEY_NAMES = ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY')
PKCS1_PRIVATEKEY_SUFFIX = ' PRIVATE KEY'
Expand Down Expand Up @@ -77,3 +78,31 @@ def extract_first_pem(text):
if not all_pems:
return None
return all_pems[0]


def _extract_type(line, start=PEM_START):
if not line.startswith(start):
return None
if not line.endswith(PEM_END):
return None
return line[len(start):-len(PEM_END)]


def extract_pem(content, strict=False):
lines = content.splitlines()
if len(lines) < 3:
raise ValueError('PEM must have at least 3 lines, have only {count}'.format(count=len(lines)))
header_type = _extract_type(lines[0])
if header_type is None:
raise ValueError('First line is not of format {start}...{end}: {line!r}'.format(start=PEM_START, end=PEM_END, line=lines[0]))
footer_type = _extract_type(lines[-1], start=PEM_END_START)
if strict:
if header_type != footer_type:
raise ValueError('Header type ({header}) is different from footer type ({footer})'.format(header=header_type, footer=footer_type))
for idx, line in enumerate(lines[1:-2]):
if len(line) != 64:
raise ValueError('Line {idx} has length {len} instead of 64'.format(idx=idx, len=len(line)))
if not (0 < len(lines[-2]) <= 64):
raise ValueError('Last line has length {len}, should be in (0, 64]'.format(len=len(lines[-2])))
content = lines[1:-1]
return header_type, ''.join(content)
280 changes: 280 additions & 0 deletions plugins/modules/x509_certificate_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright (c) 2024, Felix Fontein <[email protected]>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type


DOCUMENTATION = r'''
---
module: x509_certificate_convert
short_description: Convert X.509 certificates
version_added: 2.19.0
description:
- This module allows to convert X.509 certificates between different formats.
author:
- Felix Fontein (@felixfontein)
extends_documentation_fragment:
- ansible.builtin.files
- community.crypto.attributes
- community.crypto.attributes.files
attributes:
check_mode:
support: full
diff_mode:
support: none
safe_file_operations:
support: full
options:
src_path:
description:
- Name of the file containing the X.509 certificate to convert.
- Exactly one of O(src_path) or O(src_content) must be specified.
type: path
src_content:
description:
- The content of the file containing the X.509 certificate to convert.
- This must be text. If you are not sure that the input file is PEM, you must Base64 encode
the value and set O(src_content_base64=true). You can use the
P(ansible.builtin.b64encode#filter) filter plugin for this.
- Exactly one of O(src_path) or O(src_content) must be specified.
type: str
src_content_base64:
description:
- If set to V(true) when O(src_content) is provided, the module assumes that the value
of O(src_content) is Base64 encoded.
type: bool
default: false
format:
description:
- Determines which format the destination X.509 certificate should be written in.
- Please note that not every key can be exported in any format, and that not every
format supports encryption.
type: str
choices:
- pem
- der
required: true
strict:
description:
- If the input is a PEM file, ensure that it contains a single PEM object, that
the header and footer match, and are of type C(CERTIFICATE) or C(X509 CERTIFICATE).
type: bool
default: false
dest_path:
description:
- Name of the file in which the generated TLS/SSL X.509 certificate will be written.
type: path
required: true
backup:
description:
- Create a backup file including a timestamp so you can get
the original X.509 certificate back if you overwrote it with a new one by accident.
type: bool
default: false
seealso:
- plugin: ansible.builtin.b64encode
plugin_type: filter
- module: community.crypto.x509_certificate
- module: community.crypto.x509_certificate_pipe
- module: community.crypto.x509_certificate_info
'''

EXAMPLES = r'''
- name: Convert PEM X.509 certificate to DER format
community.crypto.x509_certificate_convert:
src_path: /etc/ssl/cert/ansible.com.pem
dest_path: /etc/ssl/cert/ansible.com.der
format: der
'''

RETURN = r'''
backup_file:
description: Name of backup file created.
returned: changed and if O(backup) is V(true)
type: str
sample: /path/to/cert.pem.2019-03-09@11:22~
'''

import base64
import os

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native, to_bytes, to_text

from ansible_collections.community.crypto.plugins.module_utils.io import (
load_file_if_exists,
write_file,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
PEM_START,
PEM_END_START,
PEM_END,
identify_pem_format,
split_pem_list,
extract_pem,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
OpenSSLObject,
)


def parse_certificate(input, strict=False):
input_format = 'pem' if identify_pem_format(input) else 'der'
if input_format == 'pem':
pems = split_pem_list(to_text(input))
if len(pems) > 1 and strict:
raise ValueError('The input contains {count} PEM objects, expecting only one since strict=true'.format(count=len(pems)))
pem_header_type, content = extract_pem(pems[0], strict=strict)
if strict and pem_header_type not in ('CERTIFICATE', 'X509 CERTIFICATE'):
raise ValueError('type is {type!r}, expecting CERTIFICATE or X509 CERTIFICATE'.format(type=pem_header_type))
input = base64.b64decode(content)
else:
pem_header_type = None
return input, input_format, pem_header_type


class X509CertificateConvertModule(OpenSSLObject):
def __init__(self, module):
super(X509CertificateConvertModule, self).__init__(
module.params['dest_path'],
'present',
False,
module.check_mode,
)

self.src_path = module.params['src_path']
self.src_content = module.params['src_content']
self.src_content_base64 = module.params['src_content_base64']
if self.src_content is not None:
self.input = to_bytes(self.src_content)
if self.src_content_base64:
try:
self.input = base64.b64decode(self.input)
except Exception as exc:
module.fail_json(msg='Cannot Base64 decode src_content: {exc}'.format(exc=exc))
else:
try:
with open(self.src_path, 'rb') as f:
self.input = f.read()
except Exception as exc:
module.fail_json(msg='Failure while reading file {fn}: {exc}'.format(fn=self.src_path, exc=exc))

self.format = module.params['format']
self.strict = module.params['strict']
self.wanted_pem_type = 'CERTIFICATE'

try:
self.input, self.input_format, dummy = parse_certificate(self.input, strict=self.strict)
except Exception as exc:
module.fail_json(msg='Error while parsing PEM: {exc}'.format(exc=exc))

self.backup = module.params['backup']
self.backup_file = None

module.params['path'] = self.path

self.dest_content = load_file_if_exists(self.path, module)
self.dest_content_format = None
self.dest_content_pem_type = None
if self.dest_content is not None:
try:
self.dest_content, self.dest_content_format, self.dest_content_pem_type = parse_certificate(
self.dest_content, strict=True)
except Exception:
pass

def needs_conversion(self):
if self.dest_content is None or self.dest_content_format is None:
return True
if self.dest_content_format != self.format:
return True
if self.input != self.dest_content:
return True
if self.format == 'pem' and self.dest_content_pem_type != self.wanted_pem_type:
return True
return False

def get_dest_certificate(self):
if self.format == 'der':
return self.input
data = to_bytes(base64.b64encode(self.input))
lines = [to_bytes('{0}{1}{2}'.format(PEM_START, self.wanted_pem_type, PEM_END))]
lines += [data[i:i + 64] for i in range(0, len(data), 64)]
lines.append(to_bytes('{0}{1}{2}\n'.format(PEM_END_START, self.wanted_pem_type, PEM_END)))
return b'\n'.join(lines)

def generate(self, module):
"""Do conversion."""
if self.needs_conversion():
# Convert
cert_data = self.get_dest_certificate()
if not self.check_mode:
if self.backup:
self.backup_file = module.backup_local(self.path)
write_file(module, cert_data)
self.changed = True

file_args = module.load_file_common_arguments(module.params)
if module.check_file_absent_if_check_mode(file_args['path']):
self.changed = True
else:
self.changed = module.set_fs_attributes_if_different(file_args, self.changed)

def dump(self):
"""Serialize the object into a dictionary."""
result = dict(
changed=self.changed,
)
if self.backup_file:
result['backup_file'] = self.backup_file

return result


def main():
argument_spec = dict(
src_path=dict(type='path'),
src_content=dict(type='str'),
src_content_base64=dict(type='bool', default=False),
format=dict(type='str', required=True, choices=['pem', 'der']),
strict=dict(type='bool', default=False),
dest_path=dict(type='path', required=True),
backup=dict(type='bool', default=False),
)
module = AnsibleModule(
argument_spec,
supports_check_mode=True,
add_file_common_args=True,
required_one_of=[('src_path', 'src_content')],
mutually_exclusive=[('src_path', 'src_content')],
)

base_dir = os.path.dirname(module.params['dest_path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)

try:
cert = X509CertificateConvertModule(module)
cert.generate(module)
result = cert.dump()
module.exit_json(**result)
except OpenSSLObjectError as exc:
module.fail_json(msg=to_native(exc))


if __name__ == '__main__':
main()
7 changes: 7 additions & 0 deletions tests/integration/targets/x509_certificate_convert/aliases
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

azp/generic/2
azp/posix/2
destructive
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

dependencies:
- setup_openssl
- setup_remote_tmp_dir
- prepare_jinja2_compat
Loading

0 comments on commit 27a9ff1

Please sign in to comment.