Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ES256 for JWT verification #340

Merged
merged 9 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/google.auth.crypt.es256.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google.auth.crypt.es256 module
==============================

.. automodule:: google.auth.crypt.es256
:members:
:inherited-members:
:show-inheritance:
1 change: 1 addition & 0 deletions docs/reference/google.auth.crypt.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ Submodules
.. toctree::

google.auth.crypt.base
google.auth.crypt.es256
google.auth.crypt.rsa
1 change: 1 addition & 0 deletions docs/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cryptography
sphinx-docstring-typing
urllib3
requests
Expand Down
16 changes: 15 additions & 1 deletion docs/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,21 @@ Impersonated ::
target_credentials,
target_audience=target_audience)

IDToken verification can be done for various type of IDTokens using the :class:`google.oauth2.id_token` module
IDToken verification can be done for various type of IDTokens using the
:class:`google.oauth2.id_token` module. It supports ID token signed with RS256
and ES256 algorithms. However, ES256 algorithm won't be available unless
`cryptography` dependency of version at least 1.4.0 is installed. You can check
the dependency with `pip freeze` or try `from google.auth.crypt import es256`.
The following is an example of verifying ID tokens:

from google.auth2 import id_token

request = google.auth.transport.requests.Request()

try:
decoded_token = id_token.verify_token(token_to_verify,request)
except ValueError:
# Verification failed.

A sample end-to-end flow using an ID Token against a Cloud Run endpoint maybe ::

Expand Down
34 changes: 30 additions & 4 deletions google/auth/crypt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,34 @@
private_key = open('private_key.pem').read()
signer = crypt.RSASigner.from_string(private_key)
signature = signer.sign(message)

The code above also works for :class:`ES256Signer` and :class:`ES256Verifier`.
Note that these two classes are only available if your `cryptography` dependency
version is at least 1.4.0.
"""

import six

from google.auth.crypt import base
from google.auth.crypt import rsa

try:
from google.auth.crypt import es256
except ImportError: # pragma: NO COVER
es256 = None

if es256 is not None: # pragma: NO COVER
__all__ = [
"ES256Signer",
"ES256Verifier",
"RSASigner",
"RSAVerifier",
"Signer",
"Verifier",
]
else: # pragma: NO COVER
__all__ = ["RSASigner", "RSAVerifier", "Signer", "Verifier"]

__all__ = ["RSASigner", "RSAVerifier", "Signer", "Verifier"]

# Aliases to maintain the v1.0.0 interface, as the crypt module was split
# into submodules.
Expand All @@ -48,9 +67,13 @@
RSASigner = rsa.RSASigner
RSAVerifier = rsa.RSAVerifier

if es256 is not None: # pragma: NO COVER
ES256Signer = es256.ES256Signer
ES256Verifier = es256.ES256Verifier


def verify_signature(message, signature, certs):
"""Verify an RSA cryptographic signature.
def verify_signature(message, signature, certs, verifier_cls=rsa.RSAVerifier):
"""Verify an RSA or ECDSA cryptographic signature.

Checks that the provided ``signature`` was generated from ``bytes`` using
the private key associated with the ``cert``.
Expand All @@ -60,6 +83,9 @@ def verify_signature(message, signature, certs):
signature (Union[str, bytes]): The cryptographic signature to check.
certs (Union[Sequence, str, bytes]): The certificate or certificates
to use to check the signature.
verifier_cls (Optional[~google.auth.crypt.base.Signer]): Which verifier
class to use for verification. This can be used to select different
algorithms, such as RSA or ECDSA. Default value is :class:`RSAVerifier`.

Returns:
bool: True if the signature is valid, otherwise False.
Expand All @@ -68,7 +94,7 @@ def verify_signature(message, signature, certs):
certs = [certs]

for cert in certs:
verifier = rsa.RSAVerifier.from_string(cert)
verifier = verifier_cls.from_string(cert)
if verifier.verify(message, signature):
return True
return False
145 changes: 145 additions & 0 deletions google/auth/crypt/es256.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""ECDSA (ES256) verifier and signer that use the ``cryptography`` library.
"""

import cryptography.exceptions
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
import cryptography.x509
import pkg_resources

from google.auth import _helpers
from google.auth.crypt import base

_IMPORT_ERROR_MSG = (
"cryptography>=1.4.0 is required to use cryptography-based ECDSA " "algorithms"
)

try: # pragma: NO COVER
release = pkg_resources.get_distribution("cryptography").parsed_version
if release < pkg_resources.parse_version("1.4.0"):
raise ImportError(_IMPORT_ERROR_MSG)
except pkg_resources.DistributionNotFound: # pragma: NO COVER
raise ImportError(_IMPORT_ERROR_MSG)


_CERTIFICATE_MARKER = b"-----BEGIN CERTIFICATE-----"
_BACKEND = backends.default_backend()
_PADDING = padding.PKCS1v15()


class ES256Verifier(base.Verifier):
"""Verifies ECDSA cryptographic signatures using public keys.

Args:
public_key (
cryptography.hazmat.primitives.asymmetric.ec.ECDSAPublicKey):
The public key used to verify signatures.
"""

def __init__(self, public_key):
self._pubkey = public_key

@_helpers.copy_docstring(base.Verifier)
def verify(self, message, signature):
message = _helpers.to_bytes(message)
try:
self._pubkey.verify(signature, message, ec.ECDSA(hashes.SHA256()))
return True
except (ValueError, cryptography.exceptions.InvalidSignature):
return False

@classmethod
def from_string(cls, public_key):
"""Construct an Verifier instance from a public key or public
certificate string.

Args:
public_key (Union[str, bytes]): The public key in PEM format or the
x509 public key certificate.

Returns:
Verifier: The constructed verifier.

Raises:
ValueError: If the public key can't be parsed.
"""
public_key_data = _helpers.to_bytes(public_key)

if _CERTIFICATE_MARKER in public_key_data:
cert = cryptography.x509.load_pem_x509_certificate(
public_key_data, _BACKEND
)
pubkey = cert.public_key()

else:
pubkey = serialization.load_pem_public_key(public_key_data, _BACKEND)

return cls(pubkey)


class ES256Signer(base.Signer, base.FromServiceAccountMixin):
"""Signs messages with an ECDSA private key.

Args:
private_key (
cryptography.hazmat.primitives.asymmetric.ec.ECDSAPrivateKey):
The private key to sign with.
key_id (str): Optional key ID used to identify this private key. This
can be useful to associate the private key with its associated
public key or certificate.
"""

def __init__(self, private_key, key_id=None):
self._key = private_key
self._key_id = key_id

@property
@_helpers.copy_docstring(base.Signer)
def key_id(self):
return self._key_id

@_helpers.copy_docstring(base.Signer)
def sign(self, message):
message = _helpers.to_bytes(message)
return self._key.sign(message, ec.ECDSA(hashes.SHA256()))

@classmethod
def from_string(cls, key, key_id=None):
"""Construct a RSASigner from a private key in PEM format.

Args:
key (Union[bytes, str]): Private key in PEM format.
key_id (str): An optional key id used to identify the private key.

Returns:
google.auth.crypt._cryptography_rsa.RSASigner: The
constructed signer.

Raises:
ValueError: If ``key`` is not ``bytes`` or ``str`` (unicode).
UnicodeDecodeError: If ``key`` is ``bytes`` but cannot be decoded
into a UTF-8 ``str``.
ValueError: If ``cryptography`` "Could not deserialize key data."
"""
key = _helpers.to_bytes(key)
private_key = serialization.load_pem_private_key(
key, password=None, backend=_BACKEND
)
return cls(private_key, key_id=key_id)
43 changes: 40 additions & 3 deletions google/auth/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,18 @@
from google.auth import exceptions
import google.auth.credentials

try:
from google.auth.crypt import es256
except ImportError: # pragma: NO COVER
es256 = None

_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_MAX_CACHE_SIZE = 10
_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
_CRYPTOGRAPHY_BASED_ALGORITHMS = set(["ES256"])

if es256 is not None: # pragma: NO COVER
_ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier


def encode(signer, payload, header=None, key_id=None):
Expand All @@ -83,7 +93,12 @@ def encode(signer, payload, header=None, key_id=None):
if key_id is None:
key_id = signer.key_id

header.update({"typ": "JWT", "alg": "RS256"})
header.update({"typ": "JWT"})

if es256 is not None and isinstance(signer, es256.ES256Signer):
header.update({"alg": "ES256"})
else:
header.update({"alg": "RS256"})

if key_id is not None:
header["kid"] = key_id
Expand Down Expand Up @@ -217,10 +232,30 @@ def decode(token, certs=None, verify=True, audience=None):
if not verify:
return payload

# Pluck the key id and algorithm from the header and make sure we have
# a verifier that can support it.
key_alg = header.get("alg")
key_id = header.get("kid")

try:
verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
except KeyError as exc:
if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
six.raise_from(
ValueError(
"The key algorithm {} requires the cryptography package "
"to be installed.".format(key_alg)
),
exc,
)
else:
six.raise_from(
ValueError("Unsupported signature algorithm {}".format(key_alg)), exc
)

# If certs is specified as a dictionary of key IDs to certificates, then
# use the certificate identified by the key ID in the token header.
if isinstance(certs, Mapping):
key_id = header.get("kid")
if key_id:
if key_id not in certs:
raise ValueError("Certificate for key id {} not found.".format(key_id))
Expand All @@ -232,7 +267,9 @@ def decode(token, certs=None, verify=True, audience=None):
certs_to_check = certs

# Verify that the signature matches the message.
if not crypt.verify_signature(signed_section, signature, certs_to_check):
if not crypt.verify_signature(
signed_section, signature, certs_to_check, verifier_cls
):
raise ValueError("Could not verify token signature.")

# Verify the issued at and created times in the payload.
Expand Down
Loading