Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyflyte build imageSpec #1555

Merged
merged 47 commits into from
Apr 15, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a6c3dff
wip
pingsutw Mar 14, 2023
bbc258b
wip
pingsutw Mar 14, 2023
e746ff7
wip
pingsutw Mar 15, 2023
b41cc60
wip
pingsutw Mar 15, 2023
6815e58
wip
pingsutw Mar 15, 2023
ae3f9b5
wip
pingsutw Mar 16, 2023
b02b62f
wip
pingsutw Mar 16, 2023
d7c4796
wip
pingsutw Mar 16, 2023
45a0b5d
wip
pingsutw Mar 16, 2023
3bb4294
wip
pingsutw Mar 16, 2023
9a8f492
wip
pingsutw Mar 16, 2023
ae6dea6
wip
pingsutw Mar 17, 2023
b10c43a
pyflyte build
pingsutw Mar 18, 2023
22943e2
nit
pingsutw Mar 18, 2023
5c986aa
nit
pingsutw Mar 20, 2023
b25da68
wip
pingsutw Mar 23, 2023
0713e92
wip
pingsutw Mar 23, 2023
4834ba0
wip
pingsutw Mar 23, 2023
0fbe2a8
Merge branch 'master' of github.com:flyteorg/flytekit into imageSpec
pingsutw Mar 23, 2023
2ed22b8
Support serialize and package
pingsutw Mar 23, 2023
96dfaba
more tests
pingsutw Mar 24, 2023
486bd7f
wip
pingsutw Mar 26, 2023
d66891f
Merged master
pingsutw Mar 27, 2023
2e0428f
wip
pingsutw Mar 28, 2023
e1f7e31
wip
pingsutw Mar 31, 2023
c3ec7a4
move to plugin
pingsutw Mar 31, 2023
2057c03
test
pingsutw Mar 31, 2023
79c3dfa
nit
pingsutw Mar 31, 2023
2d8906f
test
pingsutw Mar 31, 2023
7ae7d97
test
pingsutw Mar 31, 2023
d6ce291
merged master
pingsutw Mar 31, 2023
a40503a
test
pingsutw Mar 31, 2023
23fd933
test
pingsutw Mar 31, 2023
9648898
wip
pingsutw Mar 31, 2023
d5e5ae8
wip
pingsutw Apr 5, 2023
e3acbf7
wip
pingsutw Apr 7, 2023
b929bc7
Merge branch 'master' of github.com:flyteorg/flytekit into imageSpec
pingsutw Apr 7, 2023
f2b064c
wip
pingsutw Apr 7, 2023
035e425
nit
pingsutw Apr 7, 2023
76ae966
fixed tested
pingsutw Apr 7, 2023
5c2a85b
fixed tested
pingsutw Apr 7, 2023
a8967a5
more tests
pingsutw Apr 8, 2023
4df984b
Add support passing yaml in pyflyte run
pingsutw Apr 13, 2023
f52f417
lint
pingsutw Apr 13, 2023
7866aa8
lint
pingsutw Apr 13, 2023
42c0f2c
nit
pingsutw Apr 13, 2023
176bd37
nit
pingsutw Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions flytekit/clis/sdk_in_container/build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
import pathlib
from collections import OrderedDict
from pathlib import Path

import click

from flytekit.clis.sdk_in_container.run import get_entities_in_file, load_naive_entity
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.extend.image_spec.base_image import build_docker_image, calculate_hash_from_image_spec
from flytekit.tools.script_mode import _find_project_root, hash_file
from flytekit.tools.translator import get_serializable

_build_help = """Build a image for the flyte task or workflow."""
pingsutw marked this conversation as resolved.
Show resolved Hide resolved


@click.command("build", help=_build_help)
@click.option(
"--file",
required=True,
type=str,
help="task or workflow file",
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
)
@click.pass_context
def build(_: click.Context, file: str):
"""
$ pyflyte build --file wf.py

Parse all the imageSpecs in this file, and build the image.

@task(image_spec=ImageSpec(...))
def t1():
...

"""
rel_path = os.path.relpath(file)
entities = get_entities_in_file(pathlib.Path(file).resolve()) # type: ignore
project_root = _find_project_root(file)

for entity in entities.all():
module = os.path.splitext(rel_path)[0].replace(os.path.sep, ".")
exe_entity = load_naive_entity(module, entity, project_root)
image_spec = exe_entity.image_spec
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
if image_spec is None:
continue
image_name = f"{image_spec.registry}/flytekit"
build_docker_image(image_spec, name=image_name, tag=calculate_hash_from_image_spec(image_spec))
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
from flytekit.clis.sdk_in_container.build import build
from flytekit.clis.sdk_in_container.constants import CTX_CONFIG_FILE, CTX_PACKAGES, CTX_VERBOSE
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.local_cache import local_cache
Expand Down Expand Up @@ -132,6 +133,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(run)
main.add_command(register)
main.add_command(backfill)
main.add_command(build)
main.epilog

if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions flytekit/configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ class SerializationSettings(object):
python_interpreter: str = DEFAULT_RUNTIME_PYTHON_INTERPRETER
flytekit_virtualenv_root: Optional[str] = None
fast_serialization_settings: Optional[FastSerializationSettings] = None
source_root: Optional[str] = None

def __post_init__(self):
if self.flytekit_virtualenv_root is None:
Expand Down
6 changes: 6 additions & 0 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
task_config: T,
task_type="python-task",
container_image: Optional[str] = None,
image_spec: Optional["ImageSpec"] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
environment: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(
**kwargs,
)
self._container_image = container_image
self._image_spec = image_spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a check that you can't have both specified?

# TODO(katrogan): Implement resource overrides
self._resources = ResourceSpec(
requests=requests if requests else Resources(), limits=limits if limits else Resources()
Expand Down Expand Up @@ -126,6 +128,10 @@ def task_resolver(self) -> TaskResolverMixin:
def container_image(self) -> Optional[str]:
return self._container_image

@property
def image_spec(self) -> Optional[ImageSpec]:
return self._image_spec

@property
def resources(self) -> ResourceSpec:
return self._resources
Expand Down
4 changes: 4 additions & 0 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flytekit.core.python_function_task import PythonFunctionTask
from flytekit.core.reference_entity import ReferenceEntity, TaskReference
from flytekit.core.resources import Resources
from flytekit.extend.image_spec.base_image import ImageSpec
from flytekit.models.documentation import Documentation
from flytekit.models.security import Secret

Expand Down Expand Up @@ -85,6 +86,7 @@ def task(
deprecated: str = "",
timeout: Union[_datetime.timedelta, int] = 0,
container_image: Optional[str] = None,
image_spec: Optional[ImageSpec] = None,
environment: Optional[Dict[str, str]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
Expand Down Expand Up @@ -170,6 +172,7 @@ def foo():
@task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}')
def foo2():
...
:param image_spec: Define a image spec for building a docker image
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
:param environment: Environment variables that should be added for this tasks execution
:param requests: Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only
to the primary container.
Expand Down Expand Up @@ -205,6 +208,7 @@ def wrapper(fn) -> PythonFunctionTask:
fn,
metadata=_metadata,
container_image=container_image,
image_spec=image_spec,
environment=environment,
requests=requests,
limits=limits,
Expand Down
12 changes: 12 additions & 0 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from flytekit.core.type_engine import TypeEngine
from flytekit.exceptions import scopes as exception_scopes
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException
from flytekit.extend.image_spec.base_image import ImageSpec
from flytekit.loggers import logger
from flytekit.models import interface as _interface_models
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -170,6 +171,7 @@ def __init__(
workflow_metadata_defaults: WorkflowMetadataDefaults,
python_interface: Interface,
docs: Optional[Documentation] = None,
image_spec: Optional[ImageSpec] = None,
**kwargs,
):
self._name = name
Expand All @@ -182,6 +184,7 @@ def __init__(
self._nodes: List[Node] = []
self._output_bindings: List[_literal_models.Binding] = []
self._docs = docs
self._image_spec = image_spec

if self._python_interface.docstring:
if self.docs is None:
Expand All @@ -208,6 +211,10 @@ def name(self) -> str:
def docs(self):
return self._docs

@property
def image_spec(self) -> ImageSpec:
return self._image_spec

@property
def short_name(self) -> str:
return extract_obj_name(self._name)
Expand Down Expand Up @@ -603,6 +610,7 @@ def __init__(
default_metadata: WorkflowMetadataDefaults,
docstring: Optional[Docstring] = None,
docs: Optional[Documentation] = None,
image_spec: Optional[ImageSpec] = None,
):
name, _, _, _ = extract_task_module(workflow_function)
self._workflow_function = workflow_function
Expand All @@ -618,6 +626,7 @@ def __init__(
workflow_metadata_defaults=default_metadata,
python_interface=native_interface,
docs=docs,
image_spec=image_spec,
)
self.compiled = False

Expand Down Expand Up @@ -727,6 +736,7 @@ def workflow(
failure_policy: Optional[WorkflowFailurePolicy] = None,
interruptible: bool = False,
docs: Optional[Documentation] = None,
image_spec: Optional[ImageSpec] = None,
) -> WorkflowBase:
"""
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG
Expand Down Expand Up @@ -756,6 +766,7 @@ def workflow(
:param failure_policy: Use the options in flytekit.WorkflowFailurePolicy
:param interruptible: Whether or not tasks launched from this workflow are by default interruptible
:param docs: Description entity for the workflow
:param image_spec: image definition for the workflow
"""

def wrapper(fn):
Expand All @@ -769,6 +780,7 @@ def wrapper(fn):
default_metadata=workflow_metadata_defaults,
docstring=Docstring(callable_=fn),
docs=docs,
image_spec=image_spec,
)
update_wrapper(workflow_instance, fn)
return workflow_instance
Expand Down
2 changes: 0 additions & 2 deletions flytekit/extend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,4 @@
from flytekit.core.promise import Promise
from flytekit.core.python_customized_container_task import PythonCustomizedContainerTask
from flytekit.core.shim_task import ExecutableTemplateShimTask, ShimTaskExecutor
from flytekit.core.task import TaskPlugins
from flytekit.core.type_engine import DictTransformer, T, TypeEngine, TypeTransformer
from flytekit.tools.translator import get_serializable
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
112 changes: 112 additions & 0 deletions flytekit/extend/image_spec/base_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import base64
import hashlib
import json
import os
import pathlib
import subprocess
from dataclasses import dataclass
from typing import List, Optional

from dataclasses_json import dataclass_json

IMAGE_LOCK = f"{os.path.expanduser('~')}{os.path.sep}.flyte{os.path.sep}image.lock"


@dataclass_json
@dataclass
class ImageSpec:
"""
Args:
packages: list of packages that will be installed in the image.
os: operating system. by default is ubuntu 20.04.
registry: docker registry. if it's specified, flytekit will push the image.
python_version: python version in the image.
"""

packages: List[str]
base_image: str = "pingsutw/envd_base:v8"
registry: Optional[str] = None
python_version: Optional[str] = "3.9"
pingsutw marked this conversation as resolved.
Show resolved Hide resolved


def create_envd_config(image_spec: ImageSpec) -> str:
packages_list = ""
for pkg in image_spec.packages:
packages_list += f'"{pkg}", '

envd_config = f"""# syntax=v1

def build():
base(image="{image_spec.base_image}", dev=False)
install.python_packages(name = [{packages_list}])
install.python(version="{image_spec.python_version}")
runtime.environ(env={{"PYTHONPATH": "/"}})
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
from flytekit.core import context_manager

ctx = context_manager.FlyteContextManager.current_context()
cfg_path = ctx.file_access.get_random_local_path("build.envd")
pathlib.Path(cfg_path).parent.mkdir(parents=True, exist_ok=True)

with open(cfg_path, "x") as f:
f.write(envd_config)

return cfg_path


def build_docker_image(image_spec: ImageSpec, name: str, tag: str):
if should_build_image(image_spec.registry, tag) is False:
return

cfg_path = create_envd_config(image_spec)
print("building image...")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
p = subprocess.run(
[
"envd",
"build",
"--path",
f"{pathlib.Path(cfg_path).parent}",
"--output",
f"type=image,name={name}:{tag},push=true",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

if p.stderr:
raise Exception(
f"failed to build the imageSpec at {cfg_path} with error {p.stderr}",
)

update_lock_file(image_spec.registry, tag)


def calculate_hash_from_image_spec(image_spec: ImageSpec):
h = hashlib.md5(bytes(image_spec.to_json(), "utf-8"))
tag = base64.urlsafe_b64encode(h.digest()).decode("ascii")
# docker tag can't contain "="
return tag.replace("=", ".")


def should_build_image(registry: str, tag: str) -> bool:
if os.path.isfile(IMAGE_LOCK) is False:
return True

with open(IMAGE_LOCK, "r") as f:
checkpoints = json.load(f)
return registry not in checkpoints or tag not in checkpoints[registry]


def update_lock_file(registry: str, tag: str):
"""
Update the ~/.flyte/image.lock. It will contains all the image names we have pushed.
If not exists, create a new file.
"""
with open(IMAGE_LOCK, "r") as f:
checkpoints = json.load(f)
if registry not in checkpoints:
checkpoints[registry] = [tag]
else:
checkpoints[registry].append(tag)
with open(IMAGE_LOCK, "w") as o:
o.write(json.dumps(checkpoints))
6 changes: 4 additions & 2 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,6 @@ def raw_register(
return None

if isinstance(cp_entity, task_models.TaskSpec):
if isinstance(cp_entity, FlyteTask):
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
version = cp_entity.id.version
ident = self._resolve_identifier(ResourceType.TASK, cp_entity.template.id.name, version, settings)
try:
self.client.create_task(task_identifer=ident, task_spec=cp_entity)
Expand Down Expand Up @@ -623,6 +621,10 @@ def _serialize_and_register(
version=version,
)
is_dummy_serialization_setting = True

if serialization_settings.version is None:
serialization_settings.version = version

_ = get_serializable(m, settings=serialization_settings, entity=entity, options=options)

ident = None
Expand Down
2 changes: 1 addition & 1 deletion flytekit/tools/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def serialize(
:param pkgs: Dot-delimited Python packages/subpackages to look into for serialization.
:param local_source_root: Where to start looking for the code.
"""

settings.source_root = local_source_root
ctx = FlyteContextManager.current_context().with_serialization_settings(settings)
with FlyteContextManager.with_context(ctx) as ctx:
# Scan all modules. the act of loading populates the global singleton that contains all objects
Expand Down
2 changes: 1 addition & 1 deletion flytekit/tools/script_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def hash_file(file_path: typing.Union[os.PathLike, str]) -> (bytes, str):
return h.digest(), h.hexdigest()


def _find_project_root(source_path) -> Path:
def _find_project_root(source_path) -> str:
"""
Find the root of the project.
The root of the project is considered to be the first ancestor from source_path that does
Expand Down
Loading