Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pipeline steps parallelization #312

Merged
merged 45 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8db8896
Migrate to asyncio (#301)
irux Aug 8, 2023
fbaeb56
Merge with main
irux Aug 15, 2023
8c70b34
Fix poetry file
irux Aug 15, 2023
58e6cf5
Update with main
irux Aug 15, 2023
a41fafc
Update lock
irux Aug 15, 2023
4b440b8
Merge with main
irux Sep 12, 2023
e301e83
Linting
irux Sep 12, 2023
29d9d01
order imports
irux Sep 12, 2023
691fe62
Support networkx to represent components as a graph (#331)
irux Sep 18, 2023
f05717d
Merge with main
irux Oct 16, 2023
5990a2f
Fix poetry lock
irux Oct 16, 2023
c26f8b1
Fix ruff
irux Oct 16, 2023
3e396cd
Fix black
irux Oct 16, 2023
ed926a3
Merge main
irux Nov 20, 2023
d908308
Merge branch 'main' into feature/parallelization
irux Jan 2, 2024
3743ef6
Format toml
irux Jan 2, 2024
3df633a
Merge with v3
irux Jan 2, 2024
a041f90
Fix poetry lock
irux Jan 2, 2024
4a15f06
Formatted
irux Jan 2, 2024
1834a2b
Change names
irux Jan 2, 2024
65ddbda
Merge branch 'v3' into feature/parallelization
irux Jan 2, 2024
f51d902
Merge with v3
irux Jan 10, 2024
8364d10
Merge branch 'v3' into feature/parallelization
irux Jan 15, 2024
deb384c
Merge with v3
irux Jan 15, 2024
bcd3865
Execute operations in parallel (#372)
irux Jan 15, 2024
8bdc766
Merge branch 'v3' into feature/parallelization
irux Jan 16, 2024
29c5a16
Merge with main
irux Jan 22, 2024
37b03cc
Format
irux Jan 22, 2024
17ad3d4
Fix poetry lock
irux Jan 22, 2024
3520ae9
Change timeout params
irux Jan 22, 2024
9cae4bb
Implement comments and fix poetry lock
irux Jan 22, 2024
6dcab16
Make everything as a property
irux Jan 22, 2024
144c8f2
Fix types
irux Jan 22, 2024
3c9293e
Merge branch 'main' into feature/parallelization
irux Jan 22, 2024
5411d54
Put override on methods
irux Jan 22, 2024
2683351
Merge branch 'feature/parallelization' of https://github.com/bakdata/…
irux Jan 22, 2024
58cc85f
Implement comments
irux Jan 23, 2024
718336a
Change order
irux Jan 23, 2024
3172f1a
Implement changes
irux Jan 23, 2024
686003a
Implement changes
irux Jan 24, 2024
b80b34e
Improve help explanation
irux Jan 24, 2024
610a9ef
Update cli docs
irux Jan 24, 2024
5d2bec1
Add rich help
irux Jan 24, 2024
e0cc162
Update message
irux Jan 24, 2024
7e15c27
Merge branch 'main' into feature/parallelization
irux Jan 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ $ kpops clean [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops deploy`
Expand All @@ -73,6 +74,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops destroy`
Expand All @@ -99,6 +101,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops generate`
Expand Down Expand Up @@ -175,6 +178,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops schema`
Expand Down
180 changes: 132 additions & 48 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import asyncio
import logging
from collections.abc import Iterator
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -32,6 +32,8 @@
from kpops.utils.yaml import print_yaml

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator

from kpops.components.base_components import PipelineComponent


Expand Down Expand Up @@ -92,6 +94,13 @@
help="Whether to dry run the command or execute it",
)

PARALLEL: bool = typer.Option(
False,
"--parallel/--no-parallel",
rich_help_panel="EXPERIMENTAL: features in preview, not production-ready",
help="Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially.",
)


class FilterType(str, Enum):
INCLUDE = "include"
Expand Down Expand Up @@ -183,6 +192,26 @@ def is_in_steps(component: PipelineComponent) -> bool:
return filtered_steps


def get_reverse_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner)


def get_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(steps_to_apply, False, runner)


def get_steps_to_apply(
pipeline: Pipeline, steps: str | None, filter_type: FilterType
) -> list[PipelineComponent]:
Expand Down Expand Up @@ -283,6 +312,7 @@ def generate(
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if output:
print_yaml(pipeline.to_yaml())
Expand Down Expand Up @@ -335,20 +365,33 @@ def deploy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
async def deploy_runner(component: PipelineComponent):
log_action("Deploy", component)
component.deploy(dry_run)
await component.deploy(dry_run)

async def async_deploy():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_concurrently_tasks_to_execute(
pipeline, steps, filter_type, deploy_runner
)
await pipeline_tasks
else:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
await deploy_runner(component)

asyncio.run(async_deploy())


@app.command(help="Destroy pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -362,19 +405,34 @@ def destroy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def destroy_runner(component: PipelineComponent):
log_action("Destroy", component)
component.destroy(dry_run)
await component.destroy(dry_run)

async def async_destroy():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, destroy_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await destroy_runner(component)

asyncio.run(async_destroy())


@app.command(help="Reset pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -388,20 +446,33 @@ def reset(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def reset_runner(component: PipelineComponent):
log_action("Reset", component)
component.destroy(dry_run)
component.reset(dry_run)
await component.destroy(dry_run)
await component.reset(dry_run)

async def async_reset():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, reset_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await reset_runner(component)

asyncio.run(async_reset())


@app.command(help="Clean pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -415,20 +486,33 @@ def clean(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def clean_runner(component: PipelineComponent):
log_action("Clean", component)
component.destroy(dry_run)
component.clean(dry_run)
await component.destroy(dry_run)
await component.clean(dry_run)

async def async_clean():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_steps = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, clean_runner
)
await pipeline_steps
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await clean_runner(component)

asyncio.run(async_clean())


def version_callback(show_version: bool) -> None:
Expand Down
23 changes: 19 additions & 4 deletions kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import re
import subprocess
Expand Down Expand Up @@ -74,7 +75,7 @@ def add_repo(
else:
self.__execute(["helm", "repo", "update"])

def upgrade_install(
async def upgrade_install(
self,
release_name: str,
chart: str,
Expand Down Expand Up @@ -103,9 +104,9 @@ def upgrade_install(
command.extend(flags.to_command())
if dry_run:
command.append("--dry-run")
return self.__execute(command)
return await self.__async_execute(command)

def uninstall(
async def uninstall(
self,
namespace: str,
release_name: str,
Expand All @@ -122,7 +123,7 @@ def uninstall(
if dry_run:
command.append("--dry-run")
try:
return self.__execute(command)
return await self.__async_execute(command)
except ReleaseNotFoundException:
log.warning(
f"Release with name {release_name} not found. Could not uninstall app."
Expand Down Expand Up @@ -229,6 +230,20 @@ def __execute(self, command: list[str]) -> str:
log.debug(process.stdout)
return process.stdout

async def __async_execute(self, command: list[str]):
command = self.__set_global_flags(command)
log.debug(f"Executing {' '.join(command)}")
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, stderr = await proc.communicate()
Helm.parse_helm_command_stderr_output(stderr.decode())
log.debug(stdout)
return stdout.decode()

def __set_global_flags(self, command: list[str]) -> list[str]:
if self._context:
log.debug(f"Changing the Kubernetes context to {self._context}")
Expand Down
Loading
Loading