Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openssl_pkcs12: add cryptography backend #234

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Continue.
  • Loading branch information
felixfontein committed May 18, 2021
commit 4c4d54f184f28810ee7ced2fe370f85399cb03a6
88 changes: 54 additions & 34 deletions plugins/modules/openssl_pkcs12.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,13 @@
pyopenssl_found = True


def load_certificate_set(filename):
def load_certificate_set(filename, backend):
'''
Load list of concatenated PEM files, and return a list of parsed certificates.
'''
with open(filename, 'rb') as f:
data = f.read().decode('utf-8')
return [load_certificate(None, content=cert) for cert in split_pem_list(data)]
return [load_certificate(None, content=cert, backend=backend) for cert in split_pem_list(data)]


class PkcsError(OpenSSLObjectError):
Expand All @@ -267,6 +267,7 @@ def __init__(self, module):
module.params['force'],
module.check_mode
)
self.backend = 'pyopenssl'
self.action = module.params['action']
self.other_certificates = module.params['other_certificates']
self.other_certificates_parse_all = module.params['other_certificates_parse_all']
Expand All @@ -293,10 +294,10 @@ def __init__(self, module):
filenames = list(self.other_certificates)
self.other_certificates = []
for other_cert_bundle in filenames:
self.other_certificates.extend(load_certificate_set(other_cert_bundle))
self.other_certificates.extend(load_certificate_set(other_cert_bundle, self.backend))
else:
self.other_certificates = [
load_certificate(other_cert) for other_cert in self.other_certificates
load_certificate(other_cert, backend=self.backend) for other_cert in self.other_certificates
]

def generate_bytes(self, module):
Expand All @@ -314,28 +315,50 @@ def generate_bytes(self, module):

if self.privatekey_path:
try:
self.pkcs12.set_privatekey(load_privatekey(self.privatekey_path, self.privatekey_passphrase))
self.pkcs12.set_privatekey(
load_privatekey(self.privatekey_path, self.privatekey_passphrase, backend=self.backend))
except OpenSSLBadPassphraseError as exc:
raise PkcsError(exc)

return self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size)

def parse_bytes(self, pkcs12_content):
p12 = crypto.load_pkcs12(pkcs12_content, self.passphrase)
pkey = p12.get_privatekey()
if pkey is not None:
pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
crt = p12.get_certificate()
if crt is not None:
crt = crypto.dump_certificate(crypto.FILETYPE_PEM, crt)
other_certs = []
if p12.get_ca_certificates() is not None:
other_certs = [crypto.dump_certificate(crypto.FILETYPE_PEM,
other_cert) for other_cert in p12.get_ca_certificates()]

friendly_name = p12.get_friendlyname()

return (pkey, crt, other_certs, friendly_name)
try:
p12 = crypto.load_pkcs12(pkcs12_content, self.passphrase)
pkey = p12.get_privatekey()
if pkey is not None:
pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
crt = p12.get_certificate()
if crt is not None:
crt = crypto.dump_certificate(crypto.FILETYPE_PEM, crt)
other_certs = []
if p12.get_ca_certificates() is not None:
other_certs = [crypto.dump_certificate(crypto.FILETYPE_PEM,
other_cert) for other_cert in p12.get_ca_certificates()]

friendly_name = p12.get_friendlyname()

return (pkey, crt, other_certs, friendly_name)
except crypto.Error as exc:
raise PkcsError(exc)

def _dump_privatekey(self, pkcs12):
return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkcs12.get_privatekey())

def _dump_certificate(self, pkcs12):
return crypto.dump_certificate(crypto.FILETYPE_PEM, pkcs12.get_certificate())

def _dump_other_certificates(self, pkcs12):
return [
crypto.dump_certificate(crypto.FILETYPE_PEM, other_cert)
for other_cert in pkcs12.get_ca_certificates()
]

def _get_friendly_name(self, pkcs12):
return pkcs12.get_friendlyname()

def generate(self):
pass

def check(self, module, perms_required=True):
"""Ensure the resource is in its desired state."""
Expand All @@ -345,8 +368,8 @@ def check(self, module, perms_required=True):
def _check_pkey_passphrase():
if self.privatekey_passphrase:
try:
load_privatekey(self.privatekey_path, self.privatekey_passphrase)
except crypto.Error:
load_privatekey(self.privatekey_path, self.privatekey_passphrase, backend=self.backend)
except OpenSSLObjectError:
return False
except OpenSSLBadPassphraseError:
return False
Expand All @@ -360,28 +383,24 @@ def _check_pkey_passphrase():
self.src = self.path
try:
pkcs12_privatekey, pkcs12_certificate, pkcs12_other_certificates, pkcs12_friendly_name = self.parse()
except crypto.Error:
except OpenSSLObjectError:
return False
if (pkcs12_privatekey is not None) and (self.privatekey_path is not None):
expected_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM,
self.pkcs12.get_privatekey())
expected_pkey = self._dump_privatekey(self.pkcs12)
if pkcs12_privatekey != expected_pkey:
return False
elif bool(pkcs12_privatekey) != bool(self.privatekey_path):
return False

if (pkcs12_certificate is not None) and (self.certificate_path is not None):

expected_cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
self.pkcs12.get_certificate())
expected_cert = self._dump_certificate(self.pkcs12)
if pkcs12_certificate != expected_cert:
return False
elif bool(pkcs12_certificate) != bool(self.certificate_path):
return False

if (pkcs12_other_certificates is not None) and (self.other_certificates is not None):
expected_other_certs = [crypto.dump_certificate(crypto.FILETYPE_PEM,
other_cert) for other_cert in self.pkcs12.get_ca_certificates()]
expected_other_certs = self._dump_other_certificates(self.pkcs12)
if set(pkcs12_other_certificates) != set(expected_other_certs):
return False
elif bool(pkcs12_other_certificates) != bool(self.other_certificates):
Expand All @@ -390,15 +409,16 @@ def _check_pkey_passphrase():
if pkcs12_privatekey:
# This check is required because pyOpenSSL will not return a friendly name
# if the private key is not set in the file
if ((self.pkcs12.get_friendlyname() is not None) and (pkcs12_friendly_name is not None)):
if self.pkcs12.get_friendlyname() != pkcs12_friendly_name:
friendly_name = get_friendly_name(self.pkcs12)
if ((friendly_name is not None) and (pkcs12_friendly_name is not None)):
if friendly_name != pkcs12_friendly_name:
return False
elif bool(self.pkcs12.get_friendlyname()) != bool(pkcs12_friendly_name):
elif bool(friendly_name) != bool(pkcs12_friendly_name):
return False
elif module.params['action'] == 'parse' and os.path.exists(self.src) and os.path.exists(self.path):
try:
pkey, cert, other_certs, friendly_name = self.parse()
except crypto.Error:
except OpenSSLObjectError:
return False
expected_content = to_bytes(
''.join([to_native(pem) for pem in [pkey, cert] + other_certs if pem is not None])
Expand Down