Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyte deck] Streaming Decks #2779

Merged
merged 85 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
01182b4
[Flyte Decl] Streaming Decks
Future-Outlier Oct 1, 2024
9616fc3
print
Future-Outlier Oct 1, 2024
98ae7c7
sleep more
Future-Outlier Oct 1, 2024
4e92bb0
add dummy deck
Future-Outlier Oct 2, 2024
4df18b5
nit
Future-Outlier Oct 2, 2024
ebb4d4e
dummy deck
Future-Outlier Oct 2, 2024
99522d9
update
Future-Outlier Oct 2, 2024
c19d67d
nit
Future-Outlier Oct 2, 2024
67cd829
test
Future-Outlier Oct 2, 2024
06da3df
return html
Future-Outlier Oct 2, 2024
6d99d69
Change Deck
Future-Outlier Oct 2, 2024
b805cd7
fix
Future-Outlier Oct 2, 2024
4c97758
fix recursion error
Future-Outlier Oct 2, 2024
7b3574a
remove redundant code
Future-Outlier Oct 2, 2024
9b60564
add dummy deck to deck init
Future-Outlier Oct 2, 2024
18c994f
Better Dummy Deck Logic
Future-Outlier Oct 2, 2024
39f39d1
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Oct 7, 2024
aabcbbb
Deck Publish
Future-Outlier Oct 7, 2024
9ca43f3
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Nov 25, 2024
ed56352
litn
Future-Outlier Nov 27, 2024
b559fc9
remove dummy deck
Future-Outlier Nov 27, 2024
fc5578f
nit
Future-Outlier Nov 27, 2024
7139468
use auto refresh tab, 5 seconds as interval
Future-Outlier Dec 2, 2024
e0aee9e
revert
Future-Outlier Dec 2, 2024
3727588
test setDynamicTabs
Future-Outlier Dec 2, 2024
ce3ee15
change interval time
Future-Outlier Dec 2, 2024
d066231
test
Future-Outlier Dec 2, 2024
f9387ce
revert
Future-Outlier Dec 2, 2024
f14c3fa
test
Future-Outlier Dec 2, 2024
c33a909
nit
Future-Outlier Dec 2, 2024
8666c60
try dynamic containers
Future-Outlier Dec 2, 2024
93580d6
try dynamic containers v2
Future-Outlier Dec 2, 2024
bcaaabd
try dynamic containers v3
Future-Outlier Dec 2, 2024
a321700
debug
Future-Outlier Dec 2, 2024
6464fae
update
Future-Outlier Dec 2, 2024
884943c
nit
Future-Outlier Dec 3, 2024
d4b5b96
Refresh Botton
Future-Outlier Dec 3, 2024
406227c
fix
Future-Outlier Dec 3, 2024
1e77f54
lint
Future-Outlier Dec 3, 2024
d70a2d5
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Dec 3, 2024
7fc6393
test new refresh
Future-Outlier Dec 3, 2024
6980140
lint
Future-Outlier Dec 3, 2024
c87a342
Revert back html code, collaborating with Lyon
Future-Outlier Dec 5, 2024
f609760
lint
Future-Outlier Dec 5, 2024
473ae11
nit
Future-Outlier Dec 5, 2024
008fe52
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Dec 12, 2024
f0b9028
nit
Future-Outlier Dec 16, 2024
d48efa9
update
Future-Outlier Dec 17, 2024
6b55930
better code
Future-Outlier Dec 17, 2024
b5912fb
update
Future-Outlier Dec 17, 2024
a681ccd
some notes for giving user params builder deck enabled
Future-Outlier Dec 18, 2024
048fdff
update
Future-Outlier Jan 2, 2025
7bcf15e
raise error when disabled deck and called Deck.publish()
Future-Outlier Jan 2, 2025
b6c41c3
lint
Future-Outlier Jan 2, 2025
be02f9f
lint
Future-Outlier Jan 2, 2025
cf83e06
update
Future-Outlier Jan 8, 2025
e137328
static method by YEE
Future-Outlier Jan 9, 2025
a59a56e
make Deck.publish more like a wrapper by moving enable deck checking …
Future-Outlier Jan 9, 2025
b8383be
lint
Future-Outlier Jan 9, 2025
dc6d203
print monodocs err
Future-Outlier Jan 9, 2025
41d8760
Fix monodocs
Future-Outlier Jan 9, 2025
b71cc19
use builder
Future-Outlier Jan 9, 2025
b5976fe
add translator test for deck serialization settings
Future-Outlier Jan 9, 2025
0c1a5a3
update
Future-Outlier Jan 13, 2025
d082456
fix
Future-Outlier Jan 13, 2025
4a8c68f
test
Future-Outlier Jan 13, 2025
b58527b
update
Future-Outlier Jan 13, 2025
2764ed4
remove blank
Future-Outlier Jan 13, 2025
90372db
update kevin's advice
Future-Outlier Jan 23, 2025
d8c408c
master-branch-idl
Future-Outlier Jan 23, 2025
5cacf11
update
Future-Outlier Feb 3, 2025
c447793
Use ENABLE_DECK as constant
Future-Outlier Feb 4, 2025
c5cc967
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Feb 4, 2025
1d6417a
update pydoc lint
Future-Outlier Feb 4, 2025
b0cd1ae
unit tests for enable deck ctx user param
Future-Outlier Feb 4, 2025
974b882
update agent's advice
Future-Outlier Feb 4, 2025
ea8b6e0
use pytest.mark.parametrize in test
Future-Outlier Feb 4, 2025
a642c9d
update
Future-Outlier Feb 4, 2025
e9aef35
update
Future-Outlier Feb 4, 2025
04775d7
deck
Future-Outlier Feb 4, 2025
34a4146
update
Future-Outlier Feb 4, 2025
f30382b
Merge branch 'master' into flytekit-streaming-deck
Future-Outlier Feb 4, 2025
b649f8f
add executiuon parameter test
Future-Outlier Feb 4, 2025
0565282
Only wrap `generates_deck` in a BoolValue when moving in and out of p…
eapolinario Feb 4, 2025
c0fd23a
Streaming Deck
Future-Outlier Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def _dispatch_execute(
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

if task_def is not None and not getattr(task_def, "disable_deck", True):
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)
_output_deck(task_name=task_def.name.split(".")[-1], new_user_params=ctx.user_space_params)

logger.debug("Finished _dispatch_execute")

Expand Down
46 changes: 30 additions & 16 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,18 @@ class TaskMetadata(object):

See the :std:ref:`IDL <idl:protos/docs/core/core:taskmetadata>` for the protobuf definition.

Args:
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>`
cache_serialize (bool): Indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>`
cache_version (str): Version to be used for the cached value
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with
lower QoS guarantees that can include pre-emption. This can reduce the monetary cost executions incur at the
cost of performance penalties due to potential interruptions
deprecated (str): Can be used to provide a warning message for deprecated task. Absence or empty str indicates
that the task is active and not deprecated
Attributes:
cache (bool): Indicates if caching should be enabled. See :std:ref:`Caching <cookbook:caching>`.
cache_serialize (bool): Indicates if identical (i.e. same inputs) instances of this task should be executed in serial when caching is enabled. See :std:ref:`Caching <cookbook:caching>`.
cache_version (str): Version to be used for the cached value.
cache_ignore_input_vars (Tuple[str, ...]): Input variables that should not be included when calculating hash for cache.
interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees that can include pre-emption.
deprecated (str): Can be used to provide a warning message for a deprecated task. An absence or empty string indicates that the task is active and not deprecated.
retries (int): for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times.
timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task
should be executed for. The execution will be terminated if the runtime exceeds the given timeout
(approximately)
pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task.
timeout (Optional[Union[datetime.timedelta, int]]): The maximum duration for which one execution of this task should run. The execution will be terminated if the runtime exceeds this timeout.
pod_template_name (Optional[str]): The name of an existing PodTemplate resource in the cluster which will be used for this task.
generates_deck (bool): Indicates whether the task will generate a Deck URI.
is_eager (bool): Indicates whether the task should be treated as eager.
"""

cache: bool = False
Expand All @@ -141,6 +138,7 @@ class TaskMetadata(object):
retries: int = 0
timeout: Optional[Union[datetime.timedelta, int]] = None
pod_template_name: Optional[str] = None
generates_deck: bool = False
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
is_eager: bool = False

def __post_init__(self):
Expand Down Expand Up @@ -179,6 +177,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata:
discovery_version=self.cache_version,
deprecated_error_message=self.deprecated,
cache_serializable=self.cache_serialize,
generates_deck=self.generates_deck,
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
pod_template_name=self.pod_template_name,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
Expand Down Expand Up @@ -720,11 +719,15 @@ def dispatch_execute(
may be none
* ``DynamicJobSpec`` is returned when a dynamic workflow is executed
"""
if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)

# Invoked before the task is executed
new_user_params = self.pre_execute(ctx.user_space_params)

if self.enable_deck and ctx.user_space_params is not None:
if DeckField.TIMELINE.value in self.deck_fields:
ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck)
new_user_params = ctx.user_space_params.with_enable_deck(enable_deck=True).build()

# Create another execution context with the new user params, but let's keep the same working dir
with FlyteContextManager.with_context(
ctx.with_execution_state(
Expand Down Expand Up @@ -827,8 +830,19 @@ def disable_deck(self) -> bool:
"""
If true, this task will not output deck html file
"""
warnings.warn(
"`disable_deck` is deprecated and will be removed in the future.\n" "Please use `enable_deck` instead.",
DeprecationWarning,
)
return self._disable_deck

@property
def enable_deck(self) -> bool:
"""
If true, this task will output deck html file
"""
return not self._disable_deck

@property
def deck_fields(self) -> List[DeckField]:
"""
Expand Down
17 changes: 17 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Builder(object):
logging: Optional[_logging.Logger] = None
task_id: typing.Optional[_identifier.Identifier] = None
output_metadata_prefix: Optional[str] = None
enable_deck: bool = False

def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.stats = current.stats if current else None
Expand All @@ -107,6 +108,7 @@ def __init__(self, current: typing.Optional[ExecutionParameters] = None):
self.raw_output_prefix = current.raw_output_prefix if current else None
self.task_id = current.task_id if current else None
self.output_metadata_prefix = current.output_metadata_prefix if current else None
self.enable_deck = current.enable_deck if current else False

def add_attr(self, key: str, v: typing.Any) -> ExecutionParameters.Builder:
self.attrs[key] = v
Expand All @@ -126,6 +128,7 @@ def build(self) -> ExecutionParameters:
raw_output_prefix=self.raw_output_prefix,
task_id=self.task_id,
output_metadata_prefix=self.output_metadata_prefix,
enable_deck=self.enable_deck,
**self.attrs,
)

Expand All @@ -147,6 +150,11 @@ def with_task_sandbox(self) -> Builder:
b.working_dir = task_sandbox_dir
return b

def with_enable_deck(self, enable_deck: bool) -> Builder:
b = self.new_builder(self)
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
b.enable_deck = enable_deck
return b

def builder(self) -> Builder:
return ExecutionParameters.Builder(current=self)

Expand All @@ -162,6 +170,7 @@ def __init__(
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
enable_deck: bool = False,
**kwargs,
):
"""
Expand Down Expand Up @@ -190,6 +199,7 @@ def __init__(
self._decks = decks
self._task_id = task_id
self._timeline_deck = None
self._enable_deck = enable_deck

@property
def stats(self) -> taggable.TaggableStats:
Expand Down Expand Up @@ -298,6 +308,13 @@ def timeline_deck(self) -> "TimeLineDeck": # type: ignore
self._timeline_deck = time_line_deck
return time_line_deck

@property
def enable_deck(self) -> bool:
"""
Returns whether deck is enabled or not
"""
return self._enable_deck

def __getattr__(self, attr_name: str) -> typing.Any:
"""
This houses certain task specific context. For example in Spark, it houses the SparkSession, etc
Expand Down
24 changes: 18 additions & 6 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ class Deck:
scatter plots or Markdown text. In addition, users can create new decks to render
their data with custom renderers.

.. warning::

This feature is in beta.

Comment on lines -44 to -47
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

.. code-block:: python

iris_df = px.data.iris()
Expand Down Expand Up @@ -86,6 +82,19 @@ def name(self) -> str:
def html(self) -> str:
return self._html

@staticmethod
def publish():
params = FlyteContextManager.current_context().user_space_params
task_name = params.task_id.name

if not params.enable_deck:
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
f"Attempted to call publish() in task '{task_name}', but Flyte decks will not be generated because enable_deck is currently set to False."
)
return

_output_deck(task_name=task_name, new_user_params=params)


class TimeLineDeck(Deck):
"""
Expand Down Expand Up @@ -148,7 +157,8 @@ def generate_time_table(data: dict) -> str:


def _get_deck(
new_user_params: ExecutionParameters, ignore_jupyter: bool = False
new_user_params: ExecutionParameters,
ignore_jupyter: bool = False,
) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore
"""
Get flyte deck html string
Expand Down Expand Up @@ -176,11 +186,12 @@ def _get_deck(

def _output_deck(task_name: str, new_user_params: ExecutionParameters):
ctx = FlyteContext.current_context()

local_dir = ctx.file_access.get_random_local_directory()
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}"
try:
with open(local_path, "w", encoding="utf-8") as f:
f.write(_get_deck(new_user_params, ignore_jupyter=True))
f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True))
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix)
Expand All @@ -197,6 +208,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters):
def get_deck_template() -> Template:
root = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join(root, "html", "template.html")

with open(templates_dir, "r") as f:
template_content = f.read()
return Template(template_content)
14 changes: 14 additions & 0 deletions flytekit/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flyteidl.core import tasks_pb2 as _core_task
from google.protobuf import json_format as _json_format
from google.protobuf import struct_pb2 as _struct
from google.protobuf.wrappers_pb2 import BoolValue
from kubernetes.client import ApiClient

from flytekit.models import common as _common
Expand Down Expand Up @@ -184,6 +185,7 @@ def __init__(
pod_template_name,
cache_ignore_input_vars,
is_eager: bool = False,
generates_deck: bool = False,
):
"""
Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts,
Expand All @@ -203,6 +205,7 @@ def __init__(
receive deprecation warnings.
:param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a
single instance over identical inputs is executed, other concurrent executions wait for the cached results.
:param bool generates_deck: Whether the task will generate a Deck URI.
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
:param pod_template_name: The name of the existing PodTemplate resource which will be used in this task.
:param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache.
:param is_eager:
Expand All @@ -218,6 +221,7 @@ def __init__(
self._pod_template_name = pod_template_name
self._cache_ignore_input_vars = cache_ignore_input_vars
self._is_eager = is_eager
self._generates_deck = generates_deck

@property
def is_eager(self):
Expand Down Expand Up @@ -299,6 +303,14 @@ def pod_template_name(self):
"""
return self._pod_template_name

@property
def generates_deck(self) -> bool:
"""
Whether the task will generate a Deck.
:rtype: bool
"""
return self._generates_deck

@property
def cache_ignore_input_vars(self):
"""
Expand All @@ -322,6 +334,7 @@ def to_flyte_idl(self):
pod_template_name=self.pod_template_name,
cache_ignore_input_vars=self.cache_ignore_input_vars,
is_eager=self.is_eager,
generates_deck=BoolValue(value=self.generates_deck),
)
if self.timeout:
tm.timeout.FromTimedelta(self.timeout)
Expand All @@ -345,6 +358,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata):
pod_template_name=pb2_object.pod_template_name,
cache_ignore_input_vars=pb2_object.cache_ignore_input_vars,
is_eager=pb2_object.is_eager,
generates_deck=pb2_object.generates_deck.value if pb2_object.HasField("generates_deck") else False,
)


Expand Down
8 changes: 5 additions & 3 deletions flytekit/tools/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ def get_serializable_task(
entity.reset_command_fn()

entity_config = entity.get_config(settings) or {}

extra_config = {}

if hasattr(entity, "task_function") and isinstance(entity.task_function, ClassDecorator):
extra_config = entity.task_function.get_extra_config()
if hasattr(entity, "task_function"):
if isinstance(entity.task_function, ClassDecorator):
extra_config = entity.task_function.get_extra_config()
if entity.enable_deck:
entity.metadata.generates_deck = True

merged_config = {**entity_config, **extra_config}

Expand Down
2 changes: 0 additions & 2 deletions pydoclint-errors-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ flytekit/core/base_sql_task.py
DOC301: Class `SQLTask`: __init__() should not have a docstring; please combine it with the docstring of the class
--------------------
flytekit/core/base_task.py
DOC601: Class `TaskMetadata`: Class docstring contains fewer class attributes than actual class attributes. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
DOC603: Class `TaskMetadata`: Class docstring attributes are different from actual class attributes. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Attributes in the class definition but not in the docstring: [cache: bool, cache_ignore_input_vars: Tuple[str, ...], cache_serialize: bool, cache_version: str, deprecated: str, interruptible: Optional[bool], is_eager: bool, pod_template_name: Optional[str], retries: int, timeout: Optional[Union[datetime.timedelta, int]]]. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
DOC301: Class `PythonTask`: __init__() should not have a docstring; please combine it with the docstring of the class
DOC001: Function/method `post_execute`: Potential formatting errors in docstring. Error message: Expected a colon in 'rval is returned value from call to execute'. (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
DOC101: Method `PythonTask.post_execute`: Docstring contains fewer arguments than in function signature.
Expand Down
41 changes: 41 additions & 0 deletions tests/flytekit/unit/deck/test_deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import flytekit
from flytekit import Deck, FlyteContextManager, task

from flytekit.deck import DeckField, MarkdownRenderer, SourceCodeRenderer, TopFrameRenderer
from flytekit.deck.deck import _output_deck
from flytekit.deck.renderer import PythonDependencyRenderer
Expand Down Expand Up @@ -258,3 +259,43 @@ def test_python_dependency_renderer():

# Assert that the button of copy
assert 'button onclick="copyTable()"' in result

def test_enable_deck_in_task():
@task(enable_deck=True)
def t1():
ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == True
return

ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False

t1()

ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

def test_disable_deck_in_task():
@task(disable_deck=True)
def t1():
ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
return

ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
t1()
ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False

@task
def t2():
ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
return

ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
t2()
ctx = FlyteContextManager.current_context()
assert ctx.user_space_params.enable_deck == False
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading