This repository has been archived by the owner on Jan 18, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 431
Adding common sign_blob() service account types. #421
Merged
+280
−6
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,14 +116,29 @@ class TestAppAssertionCredentials(unittest.TestCase): | |
|
||
class AppIdentityStubImpl(apiproxy_stub.APIProxyStub): | ||
|
||
def __init__(self): | ||
def __init__(self, key_name=None, sig_bytes=None, | ||
svc_acct=None): | ||
super(TestAppAssertionCredentials.AppIdentityStubImpl, | ||
self).__init__('app_identity_service') | ||
self._key_name = key_name | ||
self._sig_bytes = sig_bytes | ||
self._sign_calls = [] | ||
self._svc_acct = svc_acct | ||
self._get_acct_name_calls = 0 | ||
|
||
def _Dynamic_GetAccessToken(self, request, response): | ||
response.set_access_token('a_token_123') | ||
response.set_expiration_time(time.time() + 1800) | ||
|
||
def _Dynamic_SignForApp(self, request, response): | ||
response.set_key_name(self._key_name) | ||
response.set_signature_bytes(self._sig_bytes) | ||
self._sign_calls.append(request.bytes_to_sign()) | ||
|
||
def _Dynamic_GetServiceAccountName(self, request, response): | ||
response.set_service_account_name(self._svc_acct) | ||
self._get_acct_name_calls += 1 | ||
|
||
class ErroringAppIdentityStubImpl(apiproxy_stub.APIProxyStub): | ||
|
||
def __init__(self): | ||
|
@@ -210,6 +225,49 @@ def test_create_scoped(self): | |
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials)) | ||
self.assertEqual('dummy_scope', new_credentials.scope) | ||
|
||
def test_sign_blob(self): | ||
key_name = b'1234567890' | ||
sig_bytes = b'himom' | ||
app_identity_stub = self.AppIdentityStubImpl( | ||
key_name=key_name, sig_bytes=sig_bytes) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
credentials = AppAssertionCredentials([]) | ||
to_sign = b'blob' | ||
self.assertEqual(app_identity_stub._sign_calls, []) | ||
result = credentials.sign_blob(to_sign) | ||
self.assertEqual(result, (key_name, sig_bytes)) | ||
self.assertEqual(app_identity_stub._sign_calls, [to_sign]) | ||
|
||
def test_service_account_email(self): | ||
acct_name = '[email protected]' | ||
app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
|
||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
self.assertIsNotNone(credentials._service_account_email) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 1) | ||
|
||
def test_service_account_email_already_set(self): | ||
acct_name = '[email protected]' | ||
credentials = AppAssertionCredentials([]) | ||
credentials._service_account_email = acct_name | ||
|
||
app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
|
||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
|
||
def test_get_access_token(self): | ||
app_identity_stub = self.AppIdentityStubImpl() | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,25 @@ | |
import json | ||
from six.moves import http_client | ||
from six.moves import urllib | ||
import unittest | ||
import unittest2 | ||
|
||
import mock | ||
|
||
import httplib2 | ||
from oauth2client._helpers import _to_bytes | ||
from oauth2client.client import AccessTokenRefreshError | ||
from oauth2client.client import Credentials | ||
from oauth2client.client import save_to_well_known_file | ||
from oauth2client.contrib.gce import _DEFAULT_EMAIL_METADATA | ||
from oauth2client.contrib.gce import _get_service_account_email | ||
from oauth2client.contrib.gce import _SCOPES_WARNING | ||
from oauth2client.contrib.gce import AppAssertionCredentials | ||
|
||
|
||
__author__ = '[email protected] (Joe Gregorio)' | ||
|
||
|
||
class AppAssertionCredentialsTests(unittest.TestCase): | ||
class AppAssertionCredentialsTests(unittest2.TestCase): | ||
|
||
def test_constructor(self): | ||
credentials = AppAssertionCredentials(foo='bar') | ||
|
@@ -150,6 +153,49 @@ def test_create_scoped(self, warn_mock): | |
self.assertEqual('dummy_scope', new_credentials.scope) | ||
warn_mock.assert_called_once_with(_SCOPES_WARNING) | ||
|
||
def test_sign_blob_not_implemented(self): | ||
credentials = AppAssertionCredentials([]) | ||
with self.assertRaises(NotImplementedError): | ||
credentials.sign_blob(b'blob') | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email', | ||
return_value=(None, '[email protected]')) | ||
def test_service_account_email(self, get_email): | ||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
self.assertEqual(credentials.service_account_email, | ||
get_email.return_value[1]) | ||
self.assertIsNotNone(credentials._service_account_email) | ||
get_email.assert_called_once_with() | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email') | ||
def test_service_account_email_already_set(self, get_email): | ||
credentials = AppAssertionCredentials([]) | ||
acct_name = '[email protected]' | ||
credentials._service_account_email = acct_name | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
get_email.assert_not_called() | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email') | ||
def test_service_account_email_failure(self, get_email): | ||
# Set-up the mock. | ||
bad_response = httplib2.Response({'status': http_client.NOT_FOUND}) | ||
content = b'bad-bytes-nothing-here' | ||
get_email.return_value = (bad_response, content) | ||
# Test the failure. | ||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
with self.assertRaises(AttributeError) as exc_manager: | ||
getattr(credentials, 'service_account_email') | ||
|
||
error_msg = ('Failed to retrieve the email from the ' | ||
'Google Compute Engine metadata service') | ||
self.assertEqual( | ||
exc_manager.exception.args, | ||
(error_msg, bad_response, content)) | ||
self.assertIsNone(credentials._service_account_email) | ||
get_email.assert_called_once_with() | ||
|
||
def test_get_access_token(self): | ||
http = mock.MagicMock() | ||
http.request = mock.MagicMock( | ||
|
@@ -178,5 +224,43 @@ def test_save_to_well_known_file(self): | |
os.path.isdir = ORIGINAL_ISDIR | ||
|
||
|
||
class Test__get_service_account_email(unittest2.TestCase): | ||
|
||
def test_success(self): | ||
http_request = mock.MagicMock() | ||
acct_name = b'[email protected]' | ||
http_request.return_value = ( | ||
httplib2.Response({'status': http_client.OK}), acct_name) | ||
result = _get_service_account_email(http_request) | ||
self.assertEqual(result, (None, acct_name.decode('utf-8'))) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
@mock.patch.object(httplib2.Http, 'request') | ||
def test_success_default_http(self, http_request): | ||
# Don't make _from_bytes() work too hard. | ||
acct_name = u'[email protected]' | ||
http_request.return_value = ( | ||
httplib2.Response({'status': http_client.OK}), acct_name) | ||
result = _get_service_account_email() | ||
self.assertEqual(result, (None, acct_name)) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
def test_failure(self): | ||
http_request = mock.MagicMock() | ||
response = httplib2.Response({'status': http_client.NOT_FOUND}) | ||
content = b'Not found' | ||
http_request.return_value = (response, content) | ||
result = _get_service_account_email(http_request) | ||
|
||
self.assertEqual(result, (response, content)) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
|
||
if __name__ == '__main__': # pragma: NO COVER | ||
unittest.main() | ||
unittest2.main() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
Sorry, something went wrong.