Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch id token from GCE metadata server #462

Merged
merged 4 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
self,
request,
target_audience,
token_uri=_DEFAULT_TOKEN_URI,
token_uri=None,
additional_claims=None,
service_account_email=None,
signer=None,
Expand All @@ -162,39 +162,52 @@ def __init__(
ignored.
use_metadata_identity_endpoint (bool): Whether to use GCE metadata
identity endpoint. For backward compatibility the default value
is False. If set to True, ``request`` argument and all other
keyword arguments (``token_uri``, ``additional_claims``,
``service_account_email``, ``signer``) will be ignored.
is False. If set to True, ``token_uri``, ``additional_claims``,
``service_account_email``, ``signer`` argument should not be set;
otherwise ValueError will be raised.

Raises:
ValueError:
If ``use_metadata_identity_endpoint`` is set to True, and one of
``token_uri``, ``additional_claims``, ``service_account_email``,
``signer`` arguments is set.
"""
super(IDTokenCredentials, self).__init__()

if use_metadata_identity_endpoint or service_account_email is None:
sa_info = _metadata.get_service_account_info(request)
service_account_email = sa_info["email"]
self._service_account_email = service_account_email

self._target_audience = target_audience
self._additional_claims = {}
self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
self._target_audience = target_audience

if use_metadata_identity_endpoint:
# If metadata identity endpoint is used, ignore all other keyword
# arguments
if token_uri or additional_claims or service_account_email or signer:
raise ValueError(
"If use_metadata_identity_endpoint is set, token_uri, "
"additional_claims, service_account_email, signer arguments"
" must not be set"
)
self._token_uri = None
self._additional_claims = None
self._signer = None

if service_account_email is None:
sa_info = _metadata.get_service_account_info(request)
self._service_account_email = sa_info["email"]
else:
self._service_account_email = service_account_email

if not use_metadata_identity_endpoint:
if signer is None:
signer = iam.Signer(
request=request,
credentials=Credentials(),
service_account_email=service_account_email,
service_account_email=self._service_account_email,
)
self._signer = signer

self._token_uri = token_uri
self._token_uri = _DEFAULT_TOKEN_URI

if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}

def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
Expand All @@ -208,15 +221,22 @@ def with_target_audience(self, target_audience):
"""
# since the signer is already instantiated,
# the request is not needed
return self.__class__(
None,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy(),
signer=self.signer,
use_metadata_identity_endpoint=self._use_metadata_identity_endpoint,
)
if self._use_metadata_identity_endpoint:
return self.__class__(
None,
target_audience=target_audience,
use_metadata_identity_endpoint=True,
)
else:
return self.__class__(
None,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy(),
signer=self.signer,
use_metadata_identity_endpoint=False,
)

def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
Expand Down Expand Up @@ -262,7 +282,7 @@ def _call_metadata_identity_endpoint(self, request):
try:
id_token = _metadata.get(
request,
"instance/service-accounts/default/identity?audience={}".format(
"instance/service-accounts/default/identity?audience={}&format=full".format(
self._target_audience
),
)
Expand Down
30 changes: 30 additions & 0 deletions tests/compute_engine/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,33 @@ def test_transport_error_from_metadata(self, get, get_service_account_info):
with pytest.raises(exceptions.RefreshError) as excinfo:
cred.refresh(request=mock.Mock())
assert excinfo.match(r"transport error")

def test_get_id_token_from_metadata_constructor(self):
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
token_uri="token_uri",
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
signer=mock.Mock(),
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
additional_claims={"key", "value"},
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
service_account_email="[email protected]",
)