Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyflyte register optionally activates schedule #1832

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import rich_click as click
from typing_extensions import OrderedDict

from flytekit.clis.sdk_in_container.constants import make_field
from flytekit.clis.sdk_in_container.run import RunCommand, RunLevelParams, WorkflowCommand
from flytekit.clis.sdk_in_container.utils import make_field
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.workflow import PythonFunctionWorkflow
Expand Down
52 changes: 0 additions & 52 deletions flytekit/clis/sdk_in_container/constants.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
import typing
from dataclasses import Field, dataclass, field
from types import MappingProxyType

import click
import rich_click as _click

CTX_PROJECT = "project"
CTX_DOMAIN = "domain"
CTX_VERSION = "version"
Expand All @@ -13,48 +6,3 @@
CTX_NOTIFICATIONS = "notifications"
CTX_CONFIG_FILE = "config_file"
CTX_VERBOSE = "verbose"


def make_field(o: click.Option) -> Field:
if o.multiple:
o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}"
return field(default_factory=lambda: o.default, metadata={"click.option": o})
return field(default=o.default, metadata={"click.option": o})


def get_option_from_metadata(metadata: MappingProxyType) -> click.Option:
return metadata["click.option"]


@dataclass
class PyFlyteParams:
config_file: typing.Optional[str] = None
verbose: bool = False
pkgs: typing.List[str] = field(default_factory=list)

@classmethod
def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams":
return cls(**d)


project_option = _click.option(
"-p",
"--project",
required=True,
type=str,
help="Flyte project to use. You can have more than one project per repo",
)
domain_option = _click.option(
"-d",
"--domain",
required=True,
type=str,
help="This is usually development, staging, or production",
)
version_option = _click.option(
"-v",
"--version",
required=False,
type=str,
help="This is the version to apply globally for this context",
)
57 changes: 26 additions & 31 deletions flytekit/clis/sdk_in_container/launchplan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import rich_click as click
from rich.progress import Progress

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.models.launch_plan import LaunchPlanState

_launchplan_help = """
Expand All @@ -13,22 +15,8 @@


@click.command("launchplan", help=_launchplan_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Fecth launchplan from this project",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Fetch launchplan from this domain",
)
@project_option_dec
@domain_option_dec
@click.option(
"--activate/--deactivate",
required=True,
Expand Down Expand Up @@ -57,18 +45,25 @@ def launchplan(
launchplan_version: str,
):
remote = get_and_save_remote_with_click_context(ctx, project, domain)
try:
launchplan = remote.fetch_launch_plan(
project=project,
domain=domain,
name=launchplan,
version=launchplan_version,
)
state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE
remote.client.update_launch_plan(id=launchplan.id, state=state)
click.secho(
f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}",
fg="green",
)
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
with Progress() as progress:
t1 = progress.add_task(f"[cyan] {'Activating' if activate else 'Deactivating'}...", total=1)
try:
progress.start_task(t1)
launchplan = remote.fetch_launch_plan(
project=project,
domain=domain,
name=launchplan,
version=launchplan_version,
)
progress.advance(t1)

state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE
remote.client.update_launch_plan(id=launchplan.id, state=state)
progress.advance(t1)
progress.update(t1, completed=True, visible=False)
click.secho(
f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}",
fg="green",
)
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
20 changes: 12 additions & 8 deletions flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from flytekit.clis.helpers import display_help_with_error
from flytekit.clis.sdk_in_container import constants
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context, patch_image_config
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.configuration import ImageConfig
from flytekit.configuration.default_images import DefaultImages
from flytekit.loggers import cli_logger
Expand All @@ -27,14 +28,8 @@


@click.command("register", help=_register_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@project_option_dec
@domain_option_dec
@click.option(
"-d",
"--domain",
Expand Down Expand Up @@ -113,6 +108,13 @@
is_flag=True,
help="Execute registration in dry-run mode. Skips actual registration to remote",
)
@click.option(
"--activate-launchplans",
"--activate-launchplan",
default=False,
is_flag=True,
help="Activate newly registered Launchplans. This operation deactivates previous versions of Launchplans.",
)
@click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1)
@click.pass_context
def register(
Expand All @@ -129,6 +131,7 @@ def register(
non_fast: bool,
package_or_module: typing.Tuple[str],
dry_run: bool,
activate_launchplans: bool,
):
"""
see help
Expand Down Expand Up @@ -179,6 +182,7 @@ def register(
package_or_module=package_or_module,
remote=remote,
dry_run=dry_run,
activate_launchplans=activate_launchplans,
)
except Exception as e:
raise e
34 changes: 10 additions & 24 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@
from rich.progress import Progress

from flytekit import Annotations, FlyteContext, Labels, Literal
from flytekit.clis.sdk_in_container.constants import PyFlyteParams, get_option_from_metadata, make_field
from flytekit.clis.sdk_in_container.helpers import get_remote, patch_image_config
from flytekit.clis.sdk_in_container.utils import pretty_print_exception
from flytekit.clis.sdk_in_container.utils import (
PyFlyteParams,
domain_option,
get_option_from_metadata,
make_field,
pretty_print_exception,
project_option,
)
from flytekit.configuration import DefaultImages, ImageConfig
from flytekit.core import context_manager
from flytekit.core.base_task import PythonTask
Expand Down Expand Up @@ -53,28 +59,8 @@ class RunLevelParams(PyFlyteParams):
This class is used to store the parameters that are used to run a workflow / task / launchplan.
"""

project: str = make_field(
click.Option(
param_decls=["-p", "--project"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project to register and run this workflow in. Can also be set through envvar "
"``FLYTE_DEFAULT_PROJECT``",
)
)
domain: str = make_field(
click.Option(
param_decls=["-d", "--domain"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain to register and run this workflow in, can also be set through envvar "
"``FLYTE_DEFAULT_DOMAIN``",
)
)
project: str = make_field(project_option)
domain: str = make_field(domain_option)
destination_dir: str = make_field(
click.Option(
param_decls=["--destination-dir", "destination_dir"],
Expand Down
63 changes: 63 additions & 0 deletions flytekit/clis/sdk_in_container/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import os
import typing
from dataclasses import Field, dataclass, field
from types import MappingProxyType

import grpc
import rich_click as click
Expand All @@ -9,6 +12,44 @@
from flytekit.exceptions.user import FlyteInvalidInputException
from flytekit.loggers import cli_logger

project_option = click.Option(
param_decls=["-p", "--project"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project to register and run this workflow in. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``",
)

domain_option = click.Option(
param_decls=["-d", "--domain"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain to register and run this workflow in, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``",
)

project_option_dec = click.option(
"-p",
"--project",
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project for workflow/launchplan. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``",
)

domain_option_dec = click.option(
"-d",
"--domain",
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain for workflow/launchplan, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``",
)


def validate_package(ctx, param, values):
"""
Expand Down Expand Up @@ -87,3 +128,25 @@ def invoke(self, ctx: click.Context) -> typing.Any:
raise e
pretty_print_exception(e)
raise SystemExit(e) from e


def make_field(o: click.Option) -> Field:
if o.multiple:
o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}"
return field(default_factory=lambda: o.default, metadata={"click.option": o})
return field(default=o.default, metadata={"click.option": o})


def get_option_from_metadata(metadata: MappingProxyType) -> click.Option:
return metadata["click.option"]


@dataclass
class PyFlyteParams:
config_file: typing.Optional[str] = None
verbose: bool = False
pkgs: typing.List[str] = field(default_factory=list)

@classmethod
def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams":
return cls(**d)
7 changes: 7 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
NotificationList,
WorkflowExecutionGetDataResponse,
)
from flytekit.models.launch_plan import LaunchPlanState
from flytekit.models.literals import Literal, LiteralMap
from flytekit.remote.backfill import create_backfill_workflow
from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow
Expand Down Expand Up @@ -1957,3 +1958,9 @@ def get_extra_headers_for_protocol(native_url):
if native_url.startswith("abfs://"):
return {"x-ms-blob-type": "BlockBlob"}
return {}

def activate_launchplan(self, ident: Identifier):
"""
Given a launchplan, activate it, all previous versions are deactivated.
"""
self.client.update_launch_plan(id=ident, state=LaunchPlanState.ACTIVE)
16 changes: 13 additions & 3 deletions flytekit/tools/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def load_packages_and_modules(
return registrable_entities


def secho(i: Identifier, state: str = "success", reason: str = None):
def secho(i: Identifier, state: str = "success", reason: str = None, op: str = "Registration"):
state_ind = "[ ]"
fg = "white"
nl = False
Expand All @@ -198,7 +198,7 @@ def secho(i: Identifier, state: str = "success", reason: str = None):
nl = True
reason = "skipped!"
click.secho(
click.style(f"{state_ind}", fg=fg) + f" Registration {i.name} type {i.resource_type_name()} {reason}",
click.style(f"{state_ind}", fg=fg) + f" {op} {i.name} type {i.resource_type_name()} {reason}",
dim=True,
nl=nl,
)
Expand All @@ -218,6 +218,7 @@ def register(
package_or_module: typing.Tuple[str],
remote: FlyteRemote,
dry_run: bool = False,
activate_launchplans: bool = False,
):
detected_root = find_common_root(package_or_module)
click.secho(f"Detected Root {detected_root}, using this to create deployable package...", fg="yellow")
Expand Down Expand Up @@ -262,14 +263,23 @@ def register(
return

for cp_entity in registrable_entities:
og_id = cp_entity.id if isinstance(cp_entity, launch_plan.LaunchPlan) else cp_entity.template.id
is_lp = False
if isinstance(cp_entity, launch_plan.LaunchPlan):
og_id = cp_entity.id
is_lp = True
else:
og_id = cp_entity.template.id
secho(og_id, "")
try:
if not dry_run:
i = remote.raw_register(
cp_entity, serialization_settings, version=version, create_default_launchplan=False
)
secho(i)
if is_lp and activate_launchplans:
secho(og_id, "", op="Activation")
remote.activate_launchplan(i)
secho(i, reason="activated", op="Activation")
else:
secho(og_id, reason="Dry run Mode!")
except RegistrationSkipped:
Expand Down