Skip to content

Commit

Permalink
Migrate to asyncio (#301)
Browse files Browse the repository at this point in the history
closes #283
  • Loading branch information
irux authored Aug 8, 2023
1 parent 85d4d04 commit 8db8896
Show file tree
Hide file tree
Showing 26 changed files with 641 additions and 460 deletions.
45 changes: 31 additions & 14 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Optional
Expand Down Expand Up @@ -283,9 +284,13 @@ def deploy(
)

steps_to_apply = get_steps_to_apply(pipeline, steps)
for component in steps_to_apply:
log_action("Deploy", component)
component.deploy(dry_run)

async def async_deploy():
for component in steps_to_apply:
log_action("Deploy", component)
await component.deploy(dry_run)

asyncio.run(async_deploy())


@app.command(
Expand All @@ -306,9 +311,13 @@ def destroy(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Destroy", component)
component.destroy(dry_run)

async def async_destroy():
for component in pipeline_steps:
log_action("Destroy", component)
await component.destroy(dry_run)

asyncio.run(async_destroy())


@app.command(
Expand All @@ -329,10 +338,14 @@ def reset(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Reset", component)
component.destroy(dry_run)
component.reset(dry_run)

async def async_reset():
for component in pipeline_steps:
log_action("Reset", component)
await component.destroy(dry_run)
await component.reset(dry_run)

asyncio.run(async_reset())


@app.command(
Expand All @@ -353,10 +366,14 @@ def clean(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Clean", component)
component.destroy(dry_run)
component.clean(dry_run)

async def async_clean():
for component in pipeline_steps:
log_action("Clean", component)
await component.destroy(dry_run)
await component.clean(dry_run)

asyncio.run(async_clean())


def version_callback(show_version: bool) -> None:
Expand Down
52 changes: 27 additions & 25 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import time
from time import sleep
from typing import Any

import httpx
Expand Down Expand Up @@ -32,13 +31,14 @@ def __init__(self, host: str | None):
)
log.error(error_message)
raise RuntimeError(error_message)
self._client = httpx.AsyncClient(base_url=host)
self._host: str = host

@property
def host(self) -> str:
return self._host

def create_connector(
async def create_connector(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> KafkaConnectResponse:
"""
Expand All @@ -50,8 +50,9 @@ def create_connector(
"""
config_json = kafka_connect_config.dict(exclude_none=True)
connect_data = {"name": connector_name, "config": config_json}
response = httpx.post(
url=f"{self._host}/connectors", headers=HEADERS, json=connect_data

response = await self._client.post(
url="/connectors", headers=HEADERS, json=connect_data
)
if response.status_code == httpx.codes.CREATED:
log.info(f"Connector {connector_name} created.")
Expand All @@ -61,19 +62,19 @@ def create_connector(
log.warning(
"Rebalancing in progress while creating a connector... Retrying..."
)
time.sleep(1)
self.create_connector(connector_name, kafka_connect_config)
await asyncio.sleep(1)
await self.create_connector(connector_name, kafka_connect_config)
raise KafkaConnectError(response)

def get_connector(self, connector_name: str) -> KafkaConnectResponse:
async def get_connector(self, connector_name: str) -> KafkaConnectResponse:
"""
Get information about the connector.
API Reference: https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)
:param connector_name: Nameof the crated connector
:return: Information about the connector
"""
response = httpx.get(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
response = await self._client.get(
url=f"/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.OK:
log.info(f"Connector {connector_name} exists.")
Expand All @@ -86,11 +87,11 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse:
log.warning(
"Rebalancing in progress while getting a connector... Retrying..."
)
sleep(1)
self.get_connector(connector_name)
await asyncio.sleep(1)
await self.get_connector(connector_name)
raise KafkaConnectError(response)

def update_connector_config(
async def update_connector_config(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> KafkaConnectResponse:
"""
Expand All @@ -100,11 +101,12 @@ def update_connector_config(
:return: Information about the connector after the change has been made.
"""
config_json = kafka_connect_config.dict(exclude_none=True)
response = httpx.put(
url=f"{self._host}/connectors/{connector_name}/config",
response = await self._client.put(
url=f"/connectors/{connector_name}/config",
headers=HEADERS,
json=config_json,
)

data: dict = response.json()
if response.status_code == httpx.codes.OK:
log.info(f"Config for connector {connector_name} updated.")
Expand All @@ -118,8 +120,8 @@ def update_connector_config(
log.warning(
"Rebalancing in progress while updating a connector... Retrying..."
)
sleep(1)
self.update_connector_config(connector_name, kafka_connect_config)
await asyncio.sleep(1)
await self.update_connector_config(connector_name, kafka_connect_config)
raise KafkaConnectError(response)

@classmethod
Expand All @@ -132,7 +134,7 @@ def get_connector_config(
connector_config["name"] = connector_name
return connector_config

def validate_connector_config(
async def validate_connector_config(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> list[str]:
"""
Expand All @@ -145,8 +147,8 @@ def validate_connector_config(
config_json = self.get_connector_config(connector_name, kafka_connect_config)
connector_class = ConnectWrapper.get_connector_class_name(config_json)

response = httpx.put(
url=f"{self._host}/connector-plugins/{connector_class}/config/validate",
response = await self._client.put(
url=f"/connector-plugins/{connector_class}/config/validate",
headers=HEADERS,
json=config_json,
)
Expand All @@ -167,13 +169,13 @@ def validate_connector_config(
return errors
raise KafkaConnectError(response)

def delete_connector(self, connector_name: str) -> None:
async def delete_connector(self, connector_name: str) -> None:
"""
Deletes a connector, halting all tasks and deleting its configuration.
API Reference:https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)-
"""
response = httpx.delete(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
response = await self._client.delete(
url=f"/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Connector {connector_name} deleted.")
Expand All @@ -185,8 +187,8 @@ def delete_connector(self, connector_name: str) -> None:
log.warning(
"Rebalancing in progress while deleting a connector... Retrying..."
)
sleep(1)
self.delete_connector(connector_name)
await asyncio.sleep(1)
await self.delete_connector(connector_name)
raise KafkaConnectError(response)

@staticmethod
Expand Down
62 changes: 41 additions & 21 deletions kpops/component_handlers/kafka_connect/kafka_connect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
self._connect_wrapper = connect_wrapper
self._timeout = timeout

def create_connector(
async def create_connector(
self,
connector_name: str,
kafka_connect_config: KafkaConnectConfig,
Expand All @@ -46,58 +46,78 @@ def create_connector(
:param dry_run: If the connector creation should be run in dry run mode.
"""
if dry_run:
self.__dry_run_connector_creation(connector_name, kafka_connect_config)
await self.__dry_run_connector_creation(
connector_name, kafka_connect_config
)
else:
try:
timeout(
lambda: self._connect_wrapper.get_connector(connector_name),

async def get_connector_locally():
return await self._connect_wrapper.get_connector(connector_name)

await timeout(
get_connector_locally(),
secs=self._timeout,
)

timeout(
lambda: self._connect_wrapper.update_connector_config(
async def update_connector_locally():
await self._connect_wrapper.update_connector_config(
connector_name, kafka_connect_config
),
)

await timeout(
update_connector_locally(),
secs=self._timeout,
)

except ConnectorNotFoundException:
timeout(
lambda: self._connect_wrapper.create_connector(

async def create_connecto_locally():
await self._connect_wrapper.create_connector(
connector_name, kafka_connect_config
),
)

await timeout(
create_connecto_locally(),
secs=self._timeout,
)

def destroy_connector(self, connector_name: str, dry_run: bool) -> None:
async def destroy_connector(self, connector_name: str, dry_run: bool) -> None:
"""
Deletes a connector resource from the cluster.
:param connector_name: The connector name.
:param dry_run: If the connector deletion should be run in dry run mode.
"""
if dry_run:
self.__dry_run_connector_deletion(connector_name)
await self.__dry_run_connector_deletion(connector_name)
else:
try:
timeout(
lambda: self._connect_wrapper.get_connector(connector_name),

async def get_connector_locally():
return await self._connect_wrapper.get_connector(connector_name)

await timeout(
get_connector_locally(),
secs=self._timeout,
)

timeout(
lambda: self._connect_wrapper.delete_connector(connector_name),
async def delete_connector_locally():
await self._connect_wrapper.delete_connector(connector_name)

await timeout(
delete_connector_locally(),
secs=self._timeout,
)
except ConnectorNotFoundException:
log.warning(
f"Connector Destruction: the connector {connector_name} does not exist. Skipping."
)

def __dry_run_connector_creation(
async def __dry_run_connector_creation(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> None:
try:
connector_config = self._connect_wrapper.get_connector(connector_name)
connector_config = await self._connect_wrapper.get_connector(connector_name)

log.info(f"Connector Creation: connector {connector_name} already exists.")
if diff := render_diff(
Expand All @@ -119,7 +139,7 @@ def __dry_run_connector_creation(
log.debug("POST /connectors HTTP/1.1")
log.debug(f"HOST: {self._connect_wrapper.host}")

errors = self._connect_wrapper.validate_connector_config(
errors = await self._connect_wrapper.validate_connector_config(
connector_name, kafka_connect_config
)
if len(errors) > 0:
Expand All @@ -132,9 +152,9 @@ def __dry_run_connector_creation(
f"Connector Creation: connector config for {connector_name} is valid!"
)

def __dry_run_connector_deletion(self, connector_name: str) -> None:
async def __dry_run_connector_deletion(self, connector_name: str) -> None:
try:
self._connect_wrapper.get_connector(connector_name)
await self._connect_wrapper.get_connector(connector_name)
log.info(
magentaify(
f"Connector Destruction: connector {connector_name} already exists. Deleting connector."
Expand Down
15 changes: 4 additions & 11 deletions kpops/component_handlers/kafka_connect/timeout.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
import asyncio
import logging
from asyncio import TimeoutError
from typing import Callable, TypeVar
from typing import Any, Coroutine, TypeVar

log = logging.getLogger("Timeout")

T = TypeVar("T")


def timeout(func: Callable[..., T], *, secs: int = 0) -> T | None:
async def timeout(func: Coroutine[Any, Any, T], *, secs: int = 0) -> T | None:
"""
Sets a timeout for a given lambda function
:param func: The callable function
:param secs: The timeout in seconds
"""

async def main_supervisor(func: Callable[..., T], secs: int) -> T:
runner = asyncio.to_thread(func)
task = asyncio.create_task(runner)
try:
task = asyncio.create_task(func)
if secs == 0:
return await task
else:
return await asyncio.wait_for(task, timeout=secs)

loop = asyncio.get_event_loop()
try:
complete = loop.run_until_complete(main_supervisor(func, secs))
return complete
except TimeoutError:
log.error(
f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml."
Expand Down
Loading

0 comments on commit 8db8896

Please sign in to comment.