Skip to content

Commit

Permalink
Migrate requests to httpx (#302)
Browse files Browse the repository at this point in the history
closes #282
  • Loading branch information
irux authored Jul 26, 2023
1 parent ca6707d commit 301b4b0
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 229 deletions.
1 change: 1 addition & 0 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
)

logger = logging.getLogger()
logging.getLogger("httpx").setLevel(logging.WARNING)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomFormatter())
logger.addHandler(stream_handler)
Expand Down
36 changes: 18 additions & 18 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from time import sleep
from typing import Any

import requests
import httpx

from kpops.component_handlers.kafka_connect.exception import (
ConnectorNotFoundException,
Expand Down Expand Up @@ -50,14 +50,14 @@ def create_connector(
"""
config_json = kafka_connect_config.dict(exclude_none=True)
connect_data = {"name": connector_name, "config": config_json}
response = requests.post(
response = httpx.post(
url=f"{self._host}/connectors", headers=HEADERS, json=connect_data
)
if response.status_code == requests.status_codes.codes.created:
if response.status_code == httpx.codes.CREATED:
log.info(f"Connector {connector_name} created.")
log.debug(response.json())
return KafkaConnectResponse(**response.json())
elif response.status_code == requests.status_codes.codes.conflict:
elif response.status_code == httpx.codes.CONFLICT:
log.warning(
"Rebalancing in progress while creating a connector... Retrying..."
)
Expand All @@ -72,17 +72,17 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse:
:param connector_name: Nameof the crated connector
:return: Information about the connector
"""
response = requests.get(
response = httpx.get(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
log.info(f"Connector {connector_name} exists.")
log.debug(response.json())
return KafkaConnectResponse(**response.json())
elif response.status_code == requests.status_codes.codes.not_found:
elif response.status_code == httpx.codes.NOT_FOUND:
log.info(f"The named connector {connector_name} does not exists.")
raise ConnectorNotFoundException()
elif response.status_code == requests.status_codes.codes.conflict:
elif response.status_code == httpx.codes.CONFLICT:
log.warning(
"Rebalancing in progress while getting a connector... Retrying..."
)
Expand All @@ -100,21 +100,21 @@ def update_connector_config(
:return: Information about the connector after the change has been made.
"""
config_json = kafka_connect_config.dict(exclude_none=True)
response = requests.put(
response = httpx.put(
url=f"{self._host}/connectors/{connector_name}/config",
headers=HEADERS,
json=config_json,
)
data: dict = response.json()
if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
log.info(f"Config for connector {connector_name} updated.")
log.debug(data)
return KafkaConnectResponse(**data)
if response.status_code == requests.status_codes.codes.created:
if response.status_code == httpx.codes.CREATED:
log.info(f"Connector {connector_name} created.")
log.debug(data)
return KafkaConnectResponse(**data)
elif response.status_code == requests.status_codes.codes.conflict:
elif response.status_code == httpx.codes.CONFLICT:
log.warning(
"Rebalancing in progress while updating a connector... Retrying..."
)
Expand Down Expand Up @@ -145,13 +145,13 @@ def validate_connector_config(
config_json = self.get_connector_config(connector_name, kafka_connect_config)
connector_class = ConnectWrapper.get_connector_class_name(config_json)

response = requests.put(
response = httpx.put(
url=f"{self._host}/connector-plugins/{connector_class}/config/validate",
headers=HEADERS,
json=config_json,
)

if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
kafka_connect_error_response = KafkaConnectConfigErrorResponse(
**response.json()
)
Expand All @@ -172,16 +172,16 @@ def delete_connector(self, connector_name: str) -> None:
Deletes a connector, halting all tasks and deleting its configuration.
API Reference:https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)-
"""
response = requests.delete(
response = httpx.delete(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == requests.status_codes.codes.no_content:
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Connector {connector_name} deleted.")
return
elif response.status_code == requests.status_codes.codes.not_found:
elif response.status_code == httpx.codes.NOT_FOUND:
log.info(f"The named connector {connector_name} does not exists.")
raise ConnectorNotFoundException()
elif response.status_code == requests.status_codes.codes.conflict:
elif response.status_code == httpx.codes.CONFLICT:
log.warning(
"Rebalancing in progress while deleting a connector... Retrying..."
)
Expand Down
4 changes: 2 additions & 2 deletions kpops/component_handlers/kafka_connect/exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kpops.component_handlers.utils.exception import RequestsException
from kpops.component_handlers.utils.exception import HttpxException


class ConnectorNotFoundException(Exception):
Expand All @@ -9,5 +9,5 @@ class ConnectorStateException(Exception):
pass


class KafkaConnectError(RequestsException):
class KafkaConnectError(HttpxException):
pass
4 changes: 2 additions & 2 deletions kpops/component_handlers/topic/exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kpops.component_handlers.utils.exception import RequestsException
from kpops.component_handlers.utils.exception import HttpxException


class TopicNotFoundException(Exception):
Expand All @@ -9,5 +9,5 @@ class TopicTransactionError(Exception):
pass


class KafkaRestProxyError(RequestsException):
class KafkaRestProxyError(HttpxException):
pass
34 changes: 17 additions & 17 deletions kpops/component_handlers/topic/proxy_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from functools import cached_property

import requests
import httpx

from kpops.cli.pipeline_config import PipelineConfig
from kpops.component_handlers.topic.exception import (
Expand Down Expand Up @@ -44,8 +44,8 @@ def cluster_id(self) -> str:
bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned.
:return: The Kafka cluster ID.
"""
response = requests.get(url=f"{self._host}/v3/clusters")
if response.status_code == requests.status_codes.codes.ok:
response = httpx.get(url=f"{self._host}/v3/clusters")
if response.status_code == httpx.codes.OK:
cluster_information = response.json()
return cluster_information["data"][0]["cluster_id"]

Expand All @@ -61,12 +61,12 @@ def create_topic(self, topic_spec: TopicSpec) -> None:
API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-topics
:param topic_spec: The topic specification.
"""
response = requests.post(
response = httpx.post(
url=f"{self._host}/v3/clusters/{self.cluster_id}/topics",
headers=HEADERS,
json=topic_spec.dict(exclude_none=True),
)
if response.status_code == requests.status_codes.codes.created:
if response.status_code == httpx.codes.CREATED:
log.info(f"Topic {topic_spec.topic_name} created.")
log.debug(response.json())
return
Expand All @@ -79,11 +79,11 @@ def delete_topic(self, topic_name: str) -> None:
API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#delete--clusters-cluster_id-topics-topic_name
:param topic_name: Name of the topic
"""
response = requests.delete(
response = httpx.delete(
url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}",
headers=HEADERS,
)
if response.status_code == requests.status_codes.codes.no_content:
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Topic {topic_name} deleted.")
return

Expand All @@ -96,17 +96,17 @@ def get_topic(self, topic_name: str) -> TopicResponse:
:param topic_name: The topic name.
:return: Response of the get topic API
"""
response = requests.get(
response = httpx.get(
url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}",
headers=HEADERS,
)
if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
log.debug(f"Topic {topic_name} found.")
log.debug(response.json())
return TopicResponse(**response.json())

elif (
response.status_code == requests.status_codes.codes.not_found
response.status_code == httpx.codes.NOT_FOUND
and response.json()["error_code"] == 40403
):
log.debug(f"Topic {topic_name} not found.")
Expand All @@ -122,18 +122,18 @@ def get_topic_config(self, topic_name: str) -> TopicConfigResponse:
:param topic_name: The topic name.
:return: The topic configuration.
"""
response = requests.get(
response = httpx.get(
url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs",
headers=HEADERS,
)

if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
log.debug(f"Configs for {topic_name} found.")
log.debug(response.json())
return TopicConfigResponse(**response.json())

elif (
response.status_code == requests.status_codes.codes.not_found
response.status_code == httpx.codes.NOT_FOUND
and response.json()["error_code"] == 40403
):
log.debug(f"Configs for {topic_name} not found.")
Expand All @@ -149,12 +149,12 @@ def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> No
:param topic_name: The topic name.
:param config_name: The configuration parameter name.
"""
response = requests.post(
response = httpx.post(
url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter",
headers=HEADERS,
json={"data": json_body},
)
if response.status_code == requests.status_codes.codes.no_content:
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Config of topic {topic_name} was altered.")
return

Expand All @@ -166,12 +166,12 @@ def get_broker_config(self) -> BrokerConfigResponse:
API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-brokers---configs
:return: The broker configuration.
"""
response = requests.get(
response = httpx.get(
url=f"{self.host}/v3/clusters/{self.cluster_id}/brokers/-/configs",
headers=HEADERS,
)

if response.status_code == requests.status_codes.codes.ok:
if response.status_code == httpx.codes.OK:
log.debug("Broker configs found.")
log.debug(response.json())
return BrokerConfigResponse(**response.json())
Expand Down
10 changes: 5 additions & 5 deletions kpops/component_handlers/utils/exception.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging

import requests
import httpx

log = logging.getLogger("RequestException")
log = logging.getLogger("HttpxException")


class RequestsException(Exception):
def __init__(self, response: requests.Response) -> None:
class HttpxException(Exception):
def __init__(self, response: httpx.Response) -> None:
self.error_code = response.status_code
self.error_msg = "Something went wrong!"
try:
log.error(
f"The request responded with the code {self.error_code}. Error body: {response.json()}"
)
response.raise_for_status()
except requests.HTTPError as e:
except httpx.HTTPError as e:
self.error_msg = str(e)
log.error(f"More information: {self.error_msg}")
super().__init__()
Loading

0 comments on commit 301b4b0

Please sign in to comment.