Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(code) add linter #17

Merged
merged 1 commit into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[flake8]
ignore = E203, W503
max-line-length = 130
max-complexity = 16
buildins = OAuth2Client
exclude = .git,.github,.venv,.eggs,build

3 changes: 3 additions & 0 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ jobs:

- name: Launch tests
run: poetry run pytest

- name: Launch Linting
run: poetry run flake8 --show-source --statistics
7 changes: 3 additions & 4 deletions oauth2_client/credentials_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import base64
from base64 import b64encode
import logging
from http import HTTPStatus
from threading import Event
Expand Down Expand Up @@ -38,8 +38,7 @@ def __init__(self, authorize_service: Optional[str],

@property
def authorization_header(self):
return 'Basic %s' % base64.b64encode(bytes('%s:%s' % (self.client_id, self.client_secret), 'UTF-8'))\
.decode('UTF-8')
return f"Basic {b64encode(bytes('%s:%s' % (self.client_id, self.client_secret), 'UTF-8')).decode('UTF-8')}"

@property
def public_api(self):
Expand Down Expand Up @@ -86,7 +85,7 @@ def _handle_bad_response(response: Response):
error = response.json()
raise OAuthError(HTTPStatus(response.status_code), error.get('error'), error.get('error_description'))
except BaseException as ex:
if type(ex) != OAuthError:
if not isinstance(ex, OAuthError):
_logger.exception(
'_handle_bad_response - error while getting error as json - %s - %s' % (type(ex), str(ex)))
raise OAuthError(HTTPStatus(response.status_code), 'unknown_error', response.text)
Expand Down
21 changes: 15 additions & 6 deletions oauth2_client/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from socketserver import TCPServer
from typing import Callable, Any, Type, Optional
from typing import Callable, Optional, Any
try:
from typing import Self
except ImportError: # before python 3.11
from typing_extensions import Self
from urllib.parse import unquote

_logger = logging.getLogger(__name__)


class _ReuseAddressTcpServer(TCPServer):
def __init__(self, host: str, port: int, handler_class: Type[BaseHTTPRequestHandler]):
def __init__(self, host: str, port: int, handler_class: Callable[[Any, Any, Self], BaseHTTPRequestHandler]):
self.allow_reuse_address = True
TCPServer.__init__(self, (host, port), handler_class)

Expand All @@ -32,11 +36,12 @@ class Handler(BaseHTTPRequestHandler):
def do_GET(self):
_logger.debug('GET - %s' % self.path)
params_received = read_request_parameters(self.path)
response = 'Response received (%s). Result was transmitted to the original thread. You can close this window.' % json.dumps(
params_received)
response = (f"Response received ({json.dumps(params_received)})."
" Result was transmitted to the original thread. You can close this window."
)
self.send_response(HTTPStatus.OK.value, 'OK')
self.send_header("Content-type", 'text/plain')
self.send_header("Content-Length", len(response))
self.send_header("Content-Length", str(len(response)))
self.end_headers()
try:
self.wfile.write(bytes(response, 'UTF-8'))
Expand All @@ -46,7 +51,11 @@ def do_GET(self):
self.wfile.flush()

_logger.debug('start_http_server - instantiating server to listen on "%s:%d"', host, port)
httpd = _ReuseAddressTcpServer(host, port, Handler)
httpd = _ReuseAddressTcpServer(
host,
port,
lambda request, client_address, server: Handler(request, client_address, server)
)

def serve():
_logger.debug('server daemon - starting server')
Expand Down
160 changes: 159 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[tool.black]
line-length = 130

[tool.poetry]
name = "oauth2-client"
version = "1.4.2"
Expand All @@ -14,6 +17,8 @@ python = ">=3.9"
requests = ">=2.5.0"

[tool.poetry.group.dev.dependencies]
black= "24.10.0"
flake8= "7.1.1"
pytest = "~8.2.2"

[tool.pytest.ini_options]
Expand Down
10 changes: 5 additions & 5 deletions tests/oauth2_client_tests/test_credential_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _handle_post(self, parameters):
self.end_headers()


class TestServer(object):
class MockTestServer(object):
def __init__(self, port, handler_class):
self.httpd = _ReuseAddressTcpServer('', port, handler_class)

Expand Down Expand Up @@ -112,7 +112,7 @@ def _check_get_parameters(self, parameters):
test_case.assertEqual(parameters.get('state', None), 'state_test')
test_case.assertEqual(parameters.get('client_id', None), 'client_id_test')
test_case.assertEqual(parameters.get('scope', None), 'scope1 scope2')
with TestServer(authorize_server_port, CheckAuthorizeHandler):
with MockTestServer(authorize_server_port, CheckAuthorizeHandler):
manager = None
try:
manager = CredentialManager(service_information, proxies=dict(http=''))
Expand All @@ -137,7 +137,7 @@ def _check_get_parameters(self, parameters):
# send bad state
parameters['state'] = 'other_state'

with TestServer(authorize_server_port, CheckAuthorizeHandlerbadState):
with MockTestServer(authorize_server_port, CheckAuthorizeHandlerbadState):
manager = None
try:
manager = CredentialManager(service_information, proxies=dict(http=''))
Expand Down Expand Up @@ -232,7 +232,7 @@ def _handle_request(self):
self.end_headers()
self.wfile.write(bytes(body, 'UTF-8'))

with TestServer(api_server_port, BearerHandler):
with MockTestServer(api_server_port, BearerHandler):
_logger.debug('test_get_token_with_code - server started')
api_url = 'http://localhost:%d/api/uri' % api_server_port
manager = CredentialManager(service_information, proxies=dict(http=''))
Expand Down Expand Up @@ -278,7 +278,7 @@ def _handle_post(self, parameters):
self.send_header("Content-Length", 0)
self.end_headers()

with TestServer(token_server_port, CheckGetTokenWithCode):
with MockTestServer(token_server_port, CheckGetTokenWithCode):
manager = CredentialManager(service_information, proxies=dict(http=''))
call_request(manager)
if not no_refresh_token:
Expand Down
2 changes: 1 addition & 1 deletion tests/oauth2_client_tests/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _extract_response(text):
if idx_start >= 0:
idx_end = text.find('}', idx_start)
if idx_end >= 0:
return json.loads(text[idx_start: idx_end+1])
return json.loads(text[idx_start: idx_end + 1])
else:
return dict()
else:
Expand Down
Loading