Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to asyncio #301

Merged
merged 25 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Optional
Expand Down Expand Up @@ -283,9 +284,13 @@ def deploy(
)

steps_to_apply = get_steps_to_apply(pipeline, steps)
for component in steps_to_apply:
log_action("Deploy", component)
component.deploy(dry_run)

async def async_deploy():
for component in steps_to_apply:
log_action("Deploy", component)
await component.deploy(dry_run)

asyncio.run(async_deploy())


@app.command(
Expand All @@ -306,9 +311,13 @@ def destroy(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Destroy", component)
component.destroy(dry_run)

async def async_destroy():
for component in pipeline_steps:
log_action("Destroy", component)
await component.destroy(dry_run)

asyncio.run(async_destroy())


@app.command(
Expand All @@ -329,10 +338,14 @@ def reset(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Reset", component)
component.destroy(dry_run)
component.reset(dry_run)

async def async_reset():
for component in pipeline_steps:
log_action("Reset", component)
await component.destroy(dry_run)
await component.reset(dry_run)

asyncio.run(async_reset())


@app.command(
Expand All @@ -353,10 +366,14 @@ def clean(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps)
for component in pipeline_steps:
log_action("Clean", component)
component.destroy(dry_run)
component.clean(dry_run)

async def async_clean():
for component in pipeline_steps:
log_action("Clean", component)
await component.destroy(dry_run)
await component.clean(dry_run)

asyncio.run(async_clean())


def version_callback(show_version: bool) -> None:
Expand Down
52 changes: 27 additions & 25 deletions kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import time
from time import sleep
from typing import Any

import httpx
Expand Down Expand Up @@ -32,13 +31,14 @@ def __init__(self, host: str | None):
)
log.error(error_message)
raise RuntimeError(error_message)
self._client = httpx.AsyncClient(base_url=host)
self._host: str = host

@property
def host(self) -> str:
return self._host

def create_connector(
async def create_connector(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> KafkaConnectResponse:
"""
Expand All @@ -50,8 +50,9 @@ def create_connector(
"""
config_json = kafka_connect_config.dict(exclude_none=True)
connect_data = {"name": connector_name, "config": config_json}
response = httpx.post(
url=f"{self._host}/connectors", headers=HEADERS, json=connect_data

response = await self._client.post(
url="/connectors", headers=HEADERS, json=connect_data
)
if response.status_code == httpx.codes.CREATED:
log.info(f"Connector {connector_name} created.")
Expand All @@ -61,19 +62,19 @@ def create_connector(
log.warning(
"Rebalancing in progress while creating a connector... Retrying..."
)
time.sleep(1)
self.create_connector(connector_name, kafka_connect_config)
await asyncio.sleep(1)
await self.create_connector(connector_name, kafka_connect_config)
raise KafkaConnectError(response)

def get_connector(self, connector_name: str) -> KafkaConnectResponse:
async def get_connector(self, connector_name: str) -> KafkaConnectResponse:
"""
Get information about the connector.
API Reference: https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)
:param connector_name: Nameof the crated connector
:return: Information about the connector
"""
response = httpx.get(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
response = await self._client.get(
url=f"/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.OK:
log.info(f"Connector {connector_name} exists.")
Expand All @@ -86,11 +87,11 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse:
log.warning(
"Rebalancing in progress while getting a connector... Retrying..."
)
sleep(1)
self.get_connector(connector_name)
await asyncio.sleep(1)
await self.get_connector(connector_name)
raise KafkaConnectError(response)

def update_connector_config(
async def update_connector_config(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> KafkaConnectResponse:
"""
Expand All @@ -100,11 +101,12 @@ def update_connector_config(
:return: Information about the connector after the change has been made.
"""
config_json = kafka_connect_config.dict(exclude_none=True)
response = httpx.put(
url=f"{self._host}/connectors/{connector_name}/config",
response = await self._client.put(
url=f"/connectors/{connector_name}/config",
headers=HEADERS,
json=config_json,
)

data: dict = response.json()
if response.status_code == httpx.codes.OK:
log.info(f"Config for connector {connector_name} updated.")
Expand All @@ -118,8 +120,8 @@ def update_connector_config(
log.warning(
"Rebalancing in progress while updating a connector... Retrying..."
)
sleep(1)
self.update_connector_config(connector_name, kafka_connect_config)
await asyncio.sleep(1)
await self.update_connector_config(connector_name, kafka_connect_config)
raise KafkaConnectError(response)

@classmethod
Expand All @@ -132,7 +134,7 @@ def get_connector_config(
connector_config["name"] = connector_name
return connector_config

def validate_connector_config(
async def validate_connector_config(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> list[str]:
"""
Expand All @@ -145,8 +147,8 @@ def validate_connector_config(
config_json = self.get_connector_config(connector_name, kafka_connect_config)
connector_class = ConnectWrapper.get_connector_class_name(config_json)

response = httpx.put(
url=f"{self._host}/connector-plugins/{connector_class}/config/validate",
response = await self._client.put(
url=f"/connector-plugins/{connector_class}/config/validate",
headers=HEADERS,
json=config_json,
)
Expand All @@ -167,13 +169,13 @@ def validate_connector_config(
return errors
raise KafkaConnectError(response)

def delete_connector(self, connector_name: str) -> None:
async def delete_connector(self, connector_name: str) -> None:
"""
Deletes a connector, halting all tasks and deleting its configuration.
API Reference:https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)-
"""
response = httpx.delete(
url=f"{self._host}/connectors/{connector_name}", headers=HEADERS
response = await self._client.delete(
url=f"/connectors/{connector_name}", headers=HEADERS
)
if response.status_code == httpx.codes.NO_CONTENT:
log.info(f"Connector {connector_name} deleted.")
Expand All @@ -185,8 +187,8 @@ def delete_connector(self, connector_name: str) -> None:
log.warning(
"Rebalancing in progress while deleting a connector... Retrying..."
)
sleep(1)
self.delete_connector(connector_name)
await asyncio.sleep(1)
await self.delete_connector(connector_name)
raise KafkaConnectError(response)

@staticmethod
Expand Down
62 changes: 41 additions & 21 deletions kpops/component_handlers/kafka_connect/kafka_connect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
self._connect_wrapper = connect_wrapper
self._timeout = timeout

def create_connector(
async def create_connector(
self,
connector_name: str,
kafka_connect_config: KafkaConnectConfig,
Expand All @@ -46,58 +46,78 @@ def create_connector(
:param dry_run: If the connector creation should be run in dry run mode.
"""
if dry_run:
self.__dry_run_connector_creation(connector_name, kafka_connect_config)
await self.__dry_run_connector_creation(
connector_name, kafka_connect_config
)
else:
try:
timeout(
lambda: self._connect_wrapper.get_connector(connector_name),

async def get_connector_locally():
return await self._connect_wrapper.get_connector(connector_name)

await timeout(
get_connector_locally(),
secs=self._timeout,
)

timeout(
lambda: self._connect_wrapper.update_connector_config(
async def update_connector_locally():
await self._connect_wrapper.update_connector_config(
connector_name, kafka_connect_config
),
)

await timeout(
update_connector_locally(),
secs=self._timeout,
)

except ConnectorNotFoundException:
timeout(
lambda: self._connect_wrapper.create_connector(

async def create_connecto_locally():
await self._connect_wrapper.create_connector(
connector_name, kafka_connect_config
),
)

await timeout(
create_connecto_locally(),
secs=self._timeout,
)

def destroy_connector(self, connector_name: str, dry_run: bool) -> None:
async def destroy_connector(self, connector_name: str, dry_run: bool) -> None:
"""
Deletes a connector resource from the cluster.
:param connector_name: The connector name.
:param dry_run: If the connector deletion should be run in dry run mode.
"""
if dry_run:
self.__dry_run_connector_deletion(connector_name)
await self.__dry_run_connector_deletion(connector_name)
else:
try:
timeout(
lambda: self._connect_wrapper.get_connector(connector_name),

async def get_connector_locally():
return await self._connect_wrapper.get_connector(connector_name)

await timeout(
get_connector_locally(),
secs=self._timeout,
)

timeout(
lambda: self._connect_wrapper.delete_connector(connector_name),
async def delete_connector_locally():
await self._connect_wrapper.delete_connector(connector_name)

await timeout(
delete_connector_locally(),
secs=self._timeout,
)
except ConnectorNotFoundException:
log.warning(
f"Connector Destruction: the connector {connector_name} does not exist. Skipping."
)

def __dry_run_connector_creation(
async def __dry_run_connector_creation(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> None:
try:
connector_config = self._connect_wrapper.get_connector(connector_name)
connector_config = await self._connect_wrapper.get_connector(connector_name)

log.info(f"Connector Creation: connector {connector_name} already exists.")
if diff := render_diff(
Expand All @@ -119,7 +139,7 @@ def __dry_run_connector_creation(
log.debug("POST /connectors HTTP/1.1")
log.debug(f"HOST: {self._connect_wrapper.host}")

errors = self._connect_wrapper.validate_connector_config(
errors = await self._connect_wrapper.validate_connector_config(
connector_name, kafka_connect_config
)
if len(errors) > 0:
Expand All @@ -132,9 +152,9 @@ def __dry_run_connector_creation(
f"Connector Creation: connector config for {connector_name} is valid!"
)

def __dry_run_connector_deletion(self, connector_name: str) -> None:
async def __dry_run_connector_deletion(self, connector_name: str) -> None:
try:
self._connect_wrapper.get_connector(connector_name)
await self._connect_wrapper.get_connector(connector_name)
log.info(
magentaify(
f"Connector Destruction: connector {connector_name} already exists. Deleting connector."
Expand Down
15 changes: 4 additions & 11 deletions kpops/component_handlers/kafka_connect/timeout.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
import asyncio
import logging
from asyncio import TimeoutError
from typing import Callable, TypeVar
from typing import Any, Coroutine, TypeVar

log = logging.getLogger("Timeout")

T = TypeVar("T")


def timeout(func: Callable[..., T], *, secs: int = 0) -> T | None:
async def timeout(func: Coroutine[Any, Any, T], *, secs: int = 0) -> T | None:
"""
Sets a timeout for a given lambda function
:param func: The callable function
:param secs: The timeout in seconds
"""

async def main_supervisor(func: Callable[..., T], secs: int) -> T:
runner = asyncio.to_thread(func)
task = asyncio.create_task(runner)
try:
task = asyncio.create_task(func)
if secs == 0:
return await task
else:
return await asyncio.wait_for(task, timeout=secs)

loop = asyncio.get_event_loop()
try:
complete = loop.run_until_complete(main_supervisor(func, secs))
return complete
except TimeoutError:
log.error(
f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml."
Expand Down
Loading