From 64b92e297d6c3623fe272602cd1e234f020f0726 Mon Sep 17 00:00:00 2001 From: Sijun Liu Date: Thu, 22 Aug 2024 03:48:25 +0000 Subject: [PATCH] feat: add cred info to ADC credentials --- google/auth/_default.py | 2 + google/auth/compute_engine/credentials.py | 8 ++ google/auth/credentials.py | 4 + google/auth/external_account.py | 43 ++++++--- .../auth/external_account_authorized_user.py | 34 +++++-- google/auth/impersonated_credentials.py | 38 +++++--- google/oauth2/credentials.py | 90 +++++++------------ google/oauth2/service_account.py | 14 ++- tests/compute_engine/test_credentials.py | 7 ++ tests/oauth2/test_credentials.py | 16 ++++ tests/oauth2/test_service_account.py | 17 ++++ tests/test__default.py | 32 +++++++ tests/test_credentials.py | 5 ++ tests/test_external_account.py | 65 ++++++++++---- .../test_external_account_authorized_user.py | 16 ++++ tests/test_impersonated_credentials.py | 17 ++++ 16 files changed, 294 insertions(+), 114 deletions(-) diff --git a/google/auth/_default.py b/google/auth/_default.py index 63009dfb8..7bbcf8591 100644 --- a/google/auth/_default.py +++ b/google/auth/_default.py @@ -237,6 +237,7 @@ def _get_gcloud_sdk_credentials(quota_project_id=None): credentials, project_id = load_credentials_from_file( credentials_filename, quota_project_id=quota_project_id ) + credentials._cred_file_path = credentials_filename if not project_id: project_id = _cloud_sdk.get_project_id() @@ -270,6 +271,7 @@ def _get_explicit_environ_credentials(quota_project_id=None): credentials, project_id = load_credentials_from_file( os.environ[environment_vars.CREDENTIALS], quota_project_id=quota_project_id ) + credentials._cred_file_path = f"{explicit_file} file via the GOOGLE_APPLICATION_CREDENTIALS environment variable" return credentials, project_id diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py index 008b991bb..f0126c0a8 100644 --- a/google/auth/compute_engine/credentials.py +++ b/google/auth/compute_engine/credentials.py @@ -157,6 +157,14 @@ def universe_domain(self): self._universe_domain_cached = True return self._universe_domain + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + return { + "credential_source": "metadata server", + "credential_type": "VM credentials", + "principal": self.service_account_email, + } + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) def with_quota_project(self, quota_project_id): creds = self.__class__( diff --git a/google/auth/credentials.py b/google/auth/credentials.py index e31930311..e5b0aea34 100644 --- a/google/auth/credentials.py +++ b/google/auth/credentials.py @@ -128,6 +128,10 @@ def universe_domain(self): """The universe domain value.""" return self._universe_domain + def get_cred_info(self): + """The credential information JSON.""" + return None + @abc.abstractmethod def refresh(self, request): """Refreshes the access token. diff --git a/google/auth/external_account.py b/google/auth/external_account.py index df0511f25..161e6c50c 100644 --- a/google/auth/external_account.py +++ b/google/auth/external_account.py @@ -186,6 +186,7 @@ def __init__( self._supplier_context = SupplierContext( self._subject_token_type, self._audience ) + self._cred_file_path = None if not self.is_workforce_pool and self._workforce_pool_user_project: # Workload identity pools do not support workforce pool user projects. @@ -321,11 +322,24 @@ def token_info_url(self): return self._token_info_url + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + cred_info_json = { + "credential_source": self._cred_file_path, + "credential_type": "external account credentials", + } + if self.service_account_email: + cred_info_json["principal"] = self.service_account_email + return cred_info_json + return None + @_helpers.copy_docstring(credentials.Scoped) def with_scopes(self, scopes, default_scopes=None): kwargs = self._constructor_args() kwargs.update(scopes=scopes, default_scopes=default_scopes) scoped = self.__class__(**kwargs) + scoped._cred_file_path = self._cred_file_path scoped._metrics_options = self._metrics_options return scoped @@ -442,30 +456,31 @@ def refresh(self, request): self.expiry = now + lifetime - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): - # Return copy of instance with the provided quota project ID. + def _make_copy(self): kwargs = self._constructor_args() - kwargs.update(quota_project_id=quota_project_id) new_cred = self.__class__(**kwargs) + new_cred._cred_file_path = self._cred_file_path new_cred._metrics_options = self._metrics_options return new_cred + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + # Return copy of instance with the provided quota project ID. + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred + @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) def with_token_uri(self, token_uri): - kwargs = self._constructor_args() - kwargs.update(token_url=token_uri) - new_cred = self.__class__(**kwargs) - new_cred._metrics_options = self._metrics_options - return new_cred + cred = self._make_copy() + cred._token_url = token_uri + return cred @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - kwargs = self._constructor_args() - kwargs.update(universe_domain=universe_domain) - new_cred = self.__class__(**kwargs) - new_cred._metrics_options = self._metrics_options - return new_cred + cred = self._make_copy() + cred._universe_domain = universe_domain + return cred def _should_initialize_impersonated_credentials(self): return ( diff --git a/google/auth/external_account_authorized_user.py b/google/auth/external_account_authorized_user.py index f73387172..4d0c3c680 100644 --- a/google/auth/external_account_authorized_user.py +++ b/google/auth/external_account_authorized_user.py @@ -120,6 +120,7 @@ def __init__( self._quota_project_id = quota_project_id self._scopes = scopes self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN + self._cred_file_path = None if not self.valid and not self.can_refresh: raise exceptions.InvalidOperation( @@ -290,23 +291,38 @@ def refresh(self, request): def _make_sts_request(self, request): return self._sts_client.refresh_token(request, self._refresh_token) + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + return { + "credential_source": self._cred_file_path, + "credential_type": "external account authorized user credentials", + } + return None + + def _make_copy(self): + kwargs = self.constructor_args() + cred = self.__class__(**kwargs) + cred._cred_file_path = self._cred_file_path + return cred + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) def with_quota_project(self, quota_project_id): - kwargs = self.constructor_args() - kwargs.update(quota_project_id=quota_project_id) - return self.__class__(**kwargs) + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) def with_token_uri(self, token_uri): - kwargs = self.constructor_args() - kwargs.update(token_url=token_uri) - return self.__class__(**kwargs) + cred = self._make_copy() + cred._token_url = token_uri + return cred @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - kwargs = self.constructor_args() - kwargs.update(universe_domain=universe_domain) - return self.__class__(**kwargs) + cred = self._make_copy() + cred._universe_domain = universe_domain + return cred @classmethod def from_info(cls, info, **kwargs): diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index 3c6f8712a..c42a93643 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -226,6 +226,7 @@ def __init__( self.expiry = _helpers.utcnow() self._quota_project_id = quota_project_id self._iam_endpoint_override = iam_endpoint_override + self._cred_file_path = None def _metric_header_for_usage(self): return metrics.CRED_TYPE_SA_IMPERSONATE @@ -316,29 +317,40 @@ def signer(self): def requires_scopes(self): return not self._target_scopes - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): - return self.__class__( + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + return { + "credential_source": self._cred_file_path, + "credential_type": "impersonated credentials", + "principal": self._target_principal, + } + return None + + def _make_copy(self): + cred = self.__class__( self._source_credentials, target_principal=self._target_principal, target_scopes=self._target_scopes, delegates=self._delegates, lifetime=self._lifetime, - quota_project_id=quota_project_id, + quota_project_id=self._quota_project_id, iam_endpoint_override=self._iam_endpoint_override, ) + cred._cred_file_path = self._cred_file_path + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred @_helpers.copy_docstring(credentials.Scoped) def with_scopes(self, scopes, default_scopes=None): - return self.__class__( - self._source_credentials, - target_principal=self._target_principal, - target_scopes=scopes or default_scopes, - delegates=self._delegates, - lifetime=self._lifetime, - quota_project_id=self._quota_project_id, - iam_endpoint_override=self._iam_endpoint_override, - ) + cred = self._make_copy() + cred._target_scopes = scopes or default_scopes + return cred class IDTokenCredentials(credentials.CredentialsWithQuotaProject): diff --git a/google/oauth2/credentials.py b/google/oauth2/credentials.py index 5ca00d4c5..f78437d7f 100644 --- a/google/oauth2/credentials.py +++ b/google/oauth2/credentials.py @@ -151,6 +151,7 @@ def __init__( self._trust_boundary = trust_boundary self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN self._account = account or "" + self._cred_file_path = None def __getstate__(self): """A __getstate__ method must exist for the __setstate__ to be called @@ -189,6 +190,7 @@ def __setstate__(self, d): self._universe_domain = ( d.get("_universe_domain") or credentials.DEFAULT_UNIVERSE_DOMAIN ) + self._cred_file_path = d.get("_cred_file_path") # The refresh_handler setter should be used to repopulate this. self._refresh_handler = None self._refresh_worker = None @@ -278,10 +280,8 @@ def account(self): """str: The user account associated with the credential. If the account is unknown an empty string is returned.""" return self._account - @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) - def with_quota_project(self, quota_project_id): - - return self.__class__( + def _make_copy(self): + cred = self.__class__( self.token, refresh_token=self.refresh_token, id_token=self.id_token, @@ -291,34 +291,36 @@ def with_quota_project(self, quota_project_id): scopes=self.scopes, default_scopes=self.default_scopes, granted_scopes=self.granted_scopes, - quota_project_id=quota_project_id, + quota_project_id=self.quota_project_id, rapt_token=self.rapt_token, enable_reauth_refresh=self._enable_reauth_refresh, trust_boundary=self._trust_boundary, universe_domain=self._universe_domain, account=self._account, ) + cred._cred_file_path = self._cred_file_path + return cred + + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + return { + "credential_source": self._cred_file_path, + "credential_type": "user credentials", + } + return None + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) def with_token_uri(self, token_uri): - - return self.__class__( - self.token, - refresh_token=self.refresh_token, - id_token=self.id_token, - token_uri=token_uri, - client_id=self.client_id, - client_secret=self.client_secret, - scopes=self.scopes, - default_scopes=self.default_scopes, - granted_scopes=self.granted_scopes, - quota_project_id=self.quota_project_id, - rapt_token=self.rapt_token, - enable_reauth_refresh=self._enable_reauth_refresh, - trust_boundary=self._trust_boundary, - universe_domain=self._universe_domain, - account=self._account, - ) + cred = self._make_copy() + cred._token_uri = token_uri + return cred def with_account(self, account): """Returns a copy of these credentials with a modified account. @@ -329,45 +331,15 @@ def with_account(self, account): Returns: google.oauth2.credentials.Credentials: A new credentials instance. """ - - return self.__class__( - self.token, - refresh_token=self.refresh_token, - id_token=self.id_token, - token_uri=self._token_uri, - client_id=self.client_id, - client_secret=self.client_secret, - scopes=self.scopes, - default_scopes=self.default_scopes, - granted_scopes=self.granted_scopes, - quota_project_id=self.quota_project_id, - rapt_token=self.rapt_token, - enable_reauth_refresh=self._enable_reauth_refresh, - trust_boundary=self._trust_boundary, - universe_domain=self._universe_domain, - account=account, - ) + cred = self._make_copy() + cred._account = account + return cred @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) def with_universe_domain(self, universe_domain): - - return self.__class__( - self.token, - refresh_token=self.refresh_token, - id_token=self.id_token, - token_uri=self._token_uri, - client_id=self.client_id, - client_secret=self.client_secret, - scopes=self.scopes, - default_scopes=self.default_scopes, - granted_scopes=self.granted_scopes, - quota_project_id=self.quota_project_id, - rapt_token=self.rapt_token, - enable_reauth_refresh=self._enable_reauth_refresh, - trust_boundary=self._trust_boundary, - universe_domain=universe_domain, - account=self._account, - ) + cred = self._make_copy() + cred._universe_domain = universe_domain + return cred def _metric_header_for_usage(self): return metrics.CRED_TYPE_USER diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py index 0e12868f1..98dafa3e3 100644 --- a/google/oauth2/service_account.py +++ b/google/oauth2/service_account.py @@ -173,6 +173,7 @@ def __init__( """ super(Credentials, self).__init__() + self._cred_file_path = None self._scopes = scopes self._default_scopes = default_scopes self._signer = signer @@ -220,7 +221,7 @@ def _from_signer_and_info(cls, signer, info, **kwargs): "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN ), trust_boundary=info.get("trust_boundary"), - **kwargs + **kwargs, ) @classmethod @@ -294,6 +295,7 @@ def _make_copy(self): always_use_jwt_access=self._always_use_jwt_access, universe_domain=self._universe_domain, ) + cred._cred_file_path = self._cred_file_path return cred @_helpers.copy_docstring(credentials.Scoped) @@ -503,6 +505,16 @@ def signer(self): def signer_email(self): return self._service_account_email + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + return { + "credential_source": self._cred_file_path, + "credential_type": "service account credentials", + "principal": self.service_account_email, + } + return None + class IDTokenCredentials( credentials.Signing, diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py index bb29f8c6e..662210fa4 100644 --- a/tests/compute_engine/test_credentials.py +++ b/tests/compute_engine/test_credentials.py @@ -72,6 +72,13 @@ def credentials_fixture(self): universe_domain=FAKE_UNIVERSE_DOMAIN, ) + def test_get_cred_info(self): + assert self.credentials.get_cred_info() == { + "credential_source": "metadata server", + "credential_type": "VM credentials", + "principal": "default", + } + def test_default_state(self): assert not self.credentials.valid # Expiration hasn't been set yet diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py index 216641946..77db32a5a 100644 --- a/tests/oauth2/test_credentials.py +++ b/tests/oauth2/test_credentials.py @@ -71,6 +71,22 @@ def test_default_state(self): assert credentials.rapt_token == self.RAPT_TOKEN assert credentials.refresh_handler is None + def test_get_cred_info(self): + credentials = self.make_credentials() + assert not credentials.get_cred_info() + + credentials._cred_file_path = "/path/to/file" + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "user credentials", + } + + def test__make_copy_get_cred_info(self): + credentials = self.make_credentials() + credentials._cred_file_path = "/path/to/file" + cred_copy = credentials._make_copy() + assert cred_copy._cred_file_path == "/path/to/file" + def test_token_usage_metrics(self): credentials = self.make_credentials() credentials.token = "token" diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index f16a43fb9..2c3fea5b2 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -68,6 +68,23 @@ def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN): universe_domain=universe_domain, ) + def test_get_cred_info(self): + credentials = self.make_credentials() + assert not credentials.get_cred_info() + + credentials._cred_file_path = "/path/to/file" + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "service account credentials", + "principal": "service-account@example.com", + } + + def test__make_copy_get_cred_info(self): + credentials = self.make_credentials() + credentials._cred_file_path = "/path/to/file" + cred_copy = credentials._make_copy() + assert cred_copy._cred_file_path == "/path/to/file" + def test_constructor_no_universe_domain(self): credentials = service_account.Credentials( SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, universe_domain=None diff --git a/tests/test__default.py b/tests/test__default.py index cb9a7c130..d17c747af 100644 --- a/tests/test__default.py +++ b/tests/test__default.py @@ -882,6 +882,38 @@ def test_default_early_out(unused_get): assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id) +@mock.patch( + "google.auth._default.load_credentials_from_file", + return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), + autospec=True, +) +def test_default_cred_file_path_env_var(unused_load_cred, monkeypatch): + monkeypatch.setenv(environment_vars.CREDENTIALS, "/path/to/file") + cred, _ = _default.default() + assert ( + cred._cred_file_path + == "/path/to/file file via the GOOGLE_APPLICATION_CREDENTIALS environment variable" + ) + + +@mock.patch("os.path.isfile", return_value=True, autospec=True) +@mock.patch( + "google.auth._cloud_sdk.get_application_default_credentials_path", + return_value="/path/to/adc/file", + autospec=True, +) +@mock.patch( + "google.auth._default.load_credentials_from_file", + return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), + autospec=True, +) +def test_default_cred_file_path_gcloud( + unused_load_cred, unused_get_adc_file, unused_isfile +): + cred, _ = _default.default() + assert cred._cred_file_path == "/path/to/adc/file" + + @mock.patch( "google.auth._default._get_explicit_environ_credentials", return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), diff --git a/tests/test_credentials.py b/tests/test_credentials.py index 8e6bbc963..e11bcb4e5 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -52,6 +52,11 @@ def test_credentials_constructor(): assert not credentials._use_non_blocking_refresh +def test_credentials_get_cred_info(): + credentials = CredentialsImpl() + assert not credentials.get_cred_info() + + def test_with_non_blocking_refresh(): c = CredentialsImpl() c.with_non_blocking_refresh() diff --git a/tests/test_external_account.py b/tests/test_external_account.py index 3c372e629..bddcb4afa 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -275,6 +275,31 @@ def assert_resource_manager_request_kwargs( assert request_kwargs["headers"] == headers assert "body" not in request_kwargs + def test_get_cred_info(self): + credentials = self.make_credentials() + assert not credentials.get_cred_info() + + credentials._cred_file_path = "/path/to/file" + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "external account credentials", + } + + credentials._service_account_impersonation_url = ( + self.SERVICE_ACCOUNT_IMPERSONATION_URL + ) + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "external account credentials", + "principal": SERVICE_ACCOUNT_EMAIL, + } + + def test__make_copy_get_cred_info(self): + credentials = self.make_credentials() + credentials._cred_file_path = "/path/to/file" + cred_copy = credentials._make_copy() + assert cred_copy._cred_file_path == "/path/to/file" + def test_default_state(self): credentials = self.make_credentials( service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL @@ -469,25 +494,29 @@ def test_with_quota_project_full_options_propagated(self): with mock.patch.object( external_account.Credentials, "__init__", return_value=None ) as mock_init: - credentials.with_quota_project("project-foo") + new_cred = credentials.with_quota_project("project-foo") - # Confirm with_quota_project initialized the credential with the - # expected parameters and quota project ID. - mock_init.assert_called_once_with( - audience=self.AUDIENCE, - subject_token_type=self.SUBJECT_TOKEN_TYPE, - token_url=self.TOKEN_URL, - token_info_url=self.TOKEN_INFO_URL, - credential_source=self.CREDENTIAL_SOURCE, - service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, - service_account_impersonation_options={"token_lifetime_seconds": 2800}, - client_id=CLIENT_ID, - client_secret=CLIENT_SECRET, - quota_project_id="project-foo", - scopes=self.SCOPES, - default_scopes=["default1"], - universe_domain=DEFAULT_UNIVERSE_DOMAIN, - ) + # Confirm with_quota_project initialized the credential with the + # expected parameters. + mock_init.assert_called_once_with( + audience=self.AUDIENCE, + subject_token_type=self.SUBJECT_TOKEN_TYPE, + token_url=self.TOKEN_URL, + token_info_url=self.TOKEN_INFO_URL, + credential_source=self.CREDENTIAL_SOURCE, + service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, + service_account_impersonation_options={"token_lifetime_seconds": 2800}, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + quota_project_id=self.QUOTA_PROJECT_ID, + scopes=self.SCOPES, + default_scopes=["default1"], + universe_domain=DEFAULT_UNIVERSE_DOMAIN, + ) + + # Confirm with_quota_project sets the correct quota project after + # initialization. + assert new_cred.quota_project_id == "project-foo" def test_info(self): credentials = self.make_credentials(universe_domain="dummy_universe.com") diff --git a/tests/test_external_account_authorized_user.py b/tests/test_external_account_authorized_user.py index 743ee9c84..93926a131 100644 --- a/tests/test_external_account_authorized_user.py +++ b/tests/test_external_account_authorized_user.py @@ -83,6 +83,22 @@ def make_mock_request(cls, status=http_client.OK, data=None): return request + def test_get_cred_info(self): + credentials = self.make_credentials() + assert not credentials.get_cred_info() + + credentials._cred_file_path = "/path/to/file" + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "external account authorized user credentials", + } + + def test__make_copy_get_cred_info(self): + credentials = self.make_credentials() + credentials._cred_file_path = "/path/to/file" + cred_copy = credentials._make_copy() + assert cred_copy._cred_file_path == "/path/to/file" + def test_default_state(self): creds = self.make_credentials() diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index a2bf31bf8..83e260638 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -135,6 +135,23 @@ def make_credentials( iam_endpoint_override=iam_endpoint_override, ) + def test_get_cred_info(self): + credentials = self.make_credentials() + assert not credentials.get_cred_info() + + credentials._cred_file_path = "/path/to/file" + assert credentials.get_cred_info() == { + "credential_source": "/path/to/file", + "credential_type": "impersonated credentials", + "principal": "impersonated@project.iam.gserviceaccount.com", + } + + def test__make_copy_get_cred_info(self): + credentials = self.make_credentials() + credentials._cred_file_path = "/path/to/file" + cred_copy = credentials._make_copy() + assert cred_copy._cred_file_path == "/path/to/file" + def test_make_from_user_credentials(self): credentials = self.make_credentials( source_credentials=self.USER_SOURCE_CREDENTIALS