Skip to content

Commit

Permalink
Make split_pems() a method of OpenSSLIOPlugin
Browse files Browse the repository at this point in the history
  • Loading branch information
buchdag committed Dec 19, 2017
1 parent 412968e commit 9cf5091
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions simp_le.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,6 @@ class Error(Exception):
"""simp_le error."""


_PEM_RE_LABELCHAR = r'[\x21-\x2c\x2e-\x7e]'
_PEM_RE = re.compile(
(r"""
^-----BEGIN\ ((?:%s(?:[- ]?%s)*)?)\s*-----$
.*?
^-----END\ \1-----\s*""" % (_PEM_RE_LABELCHAR, _PEM_RE_LABELCHAR)).encode(),
re.DOTALL | re.MULTILINE | re.VERBOSE)
_PEMS_SEP = b'\n'


def split_pems(buf):
r"""Split buffer comprised of PEM encoded (RFC 7468).
>>> x = b'\n-----BEGIN FOO BAR-----\nfoo\nbar\n-----END FOO BAR-----'
>>> len(list(split_pems(x * 3)))
3
>>> list(split_pems(b''))
[]
"""
for match in _PEM_RE.finditer(buf):
yield match.group(0)


def gen_pkey(bits):
"""Generate a private key.
Expand Down Expand Up @@ -425,10 +402,26 @@ class OpenSSLIOPlugin(IOPlugin): # pylint: disable=abstract-method
typ: One of `OpenSSL.crypto.FILETYPE_*`, used in loading/dumping.
"""

_PEMS_SEP = b'\n'

def __init__(self, typ=OpenSSL.crypto.FILETYPE_PEM, **kwargs):
self.typ = typ
super(OpenSSLIOPlugin, self).__init__(**kwargs)

@staticmethod
def split_pems(data):
"""Split buffer comprised of PEM encoded (RFC 7468)."""
pem_re_labelchar = r'[\x21-\x2c\x2e-\x7e]'
pem_re = re.compile(
(r"""
^-----BEGIN\ ((?:%s(?:[- ]?%s)*)?)\s*-----$
.*?
^-----END\ \1-----\s*""" % (pem_re_labelchar,
pem_re_labelchar)).encode(),
re.DOTALL | re.MULTILINE | re.VERBOSE)
for match in pem_re.finditer(data):
yield match.group(0)

def load_key(self, data):
"""Load private key."""
return ComparablePKey(OpenSSL.crypto.load_privatekey(self.typ, data))
Expand Down Expand Up @@ -521,7 +514,7 @@ def persisted(self):
return self.Data(account_key=False, key=False, cert=False, chain=True)

def load_from_content(self, content):
pems = list(split_pems(content))
pems = list(self.split_pems(content))
return self.Data(
account_key=None,
key=None,
Expand All @@ -531,7 +524,7 @@ def load_from_content(self, content):

def save(self, data):
pems = [self.dump_cert(cert) for cert in data.chain]
return self.save_to_file(_PEMS_SEP.join(pems))
return self.save_to_file(self._PEMS_SEP.join(pems))


@IOPlugin.register(path='fullchain.pem', typ=OpenSSL.crypto.FILETYPE_PEM)
Expand All @@ -542,7 +535,7 @@ def persisted(self):
return self.Data(account_key=False, key=False, cert=True, chain=True)

def load_from_content(self, content):
pems = list(split_pems(content))
pems = list(self.split_pems(content))
return self.Data(
account_key=None,
key=None,
Expand All @@ -553,7 +546,7 @@ def load_from_content(self, content):
def save(self, data):
pems = [self.dump_cert(data.cert)]
pems.extend(self.dump_cert(cert) for cert in data.chain)
return self.save_to_file(_PEMS_SEP.join(pems))
return self.save_to_file(self._PEMS_SEP.join(pems))


@IOPlugin.register(path='full.pem', typ=OpenSSL.crypto.FILETYPE_PEM)
Expand All @@ -564,7 +557,7 @@ def persisted(self):
return self.Data(account_key=False, key=True, cert=True, chain=True)

def load_from_content(self, content):
pems = list(split_pems(content))
pems = list(self.split_pems(content))
return self.Data(
account_key=None,
key=self.load_key(pems[0]),
Expand All @@ -575,7 +568,7 @@ def load_from_content(self, content):
def save(self, data):
pems = [self.dump_key(data.key), self.dump_cert(data.cert)]
pems.extend(self.dump_cert(cert) for cert in data.chain)
return self.save_to_file(_PEMS_SEP.join(pems))
return self.save_to_file(self._PEMS_SEP.join(pems))


def load_pem_jwk(data):
Expand Down Expand Up @@ -653,7 +646,7 @@ def persisted(self):

def load(self):
"""Call the external script to retrieve persisted data."""
pems = list(split_pems(self.get_output_or_fail('load')))
pems = list(self.split_pems(self.get_output_or_fail('load')))
if not pems:
return self.EMPTY_DATA
persisted = self.persisted()
Expand Down Expand Up @@ -689,7 +682,7 @@ def save(self, data):
logger.exception(error)
raise Error(
'There was a problem executing external IO plugin script')
stdout, stderr = proc.communicate(_PEMS_SEP.join(output))
stdout, stderr = proc.communicate(self._PEMS_SEP.join(output))
if stdout is not None:
logger.debug('STDOUT: %s', stdout)
if stderr is not None:
Expand Down Expand Up @@ -764,6 +757,18 @@ def emit(self, record):
logger.removeHandler(handler)


class SplitPemsTest(UnitTestCase):
"""split_pems static method test."""
# this is a test suite | pylint: disable=missing-docstring

def test_split_pems(self):
pem = b'\n-----BEGIN FOO BAR-----\nfoo\nbar\n-----END FOO BAR-----'
result = len(list(OpenSSLIOPlugin.split_pems(pem * 3)))
self.assertEqual(result, 3)
result = list(OpenSSLIOPlugin.split_pems(b''))
self.assertEqual(result, [])


class PluginIOTestMixin(object):
"""Common plugins tests."""
# this is a test suite | pylint: disable=missing-docstring
Expand Down

0 comments on commit 9cf5091

Please sign in to comment.