Skip to content

Commit

Permalink
update actor examples
Browse files Browse the repository at this point in the history
  • Loading branch information
dansola committed Jan 8, 2025
1 parent 855cea4 commit 8d69049
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 43 deletions.
36 changes: 36 additions & 0 deletions user_guide/core_concepts/actors/byoc/caching_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from time import sleep
import os

import union

image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = union.ActorEnvironment(
name="my-actor",
container_image=image,
replica_count=1,
)


@actor.cache
def load_model(state: int) -> callable:
sleep(4) # simulate model loading
return lambda value: state + value


@actor.task
def evaluate(value: int, state: int) -> int:
model = load_model(state=state)
return model(value)


@union.workflow
def wf(init_value: int = 1, state: int = 3) -> int:
out = evaluate(value=init_value, state=state)
out = evaluate(value=out, state=state)
out = evaluate(value=out, state=state)
out = evaluate(value=out, state=state)
return out
60 changes: 60 additions & 0 deletions user_guide/core_concepts/actors/byoc/caching_custom_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from functools import partial
from pathlib import Path
from time import sleep
import os

import union

image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = union.ActorEnvironment(
name="my-actor",
container_image=image,
replica_count=2,
)


class MyModel:
"""Simple model that multiples value with model_state."""

def __init__(self, model_state: int):
self.model_state = model_state

def __call__(self, value: int):
return self.model_state * value


@task(container_image=image, cache=True, cache_version="v1")
def create_model_state() -> union.FlyteFile:
working_dir = Path(union.current_context().working_directory)
model_state_path = working_dir / "model_state.txt"
model_state_path.write_text("4")
return model_state_path


@actor.cache
def load_model(model_state_path: union.FlyteFile) -> MyModel:
# Simulate model loading time. This can take a long time
# because the FlyteFile download is large, or when the
# model is loaded onto the GPU.
sleep(10)
with model_state_path.open("r") as f:
model_state = int(f.read())

return MyModel(model_state=model_state)


@actor.task
def inference(value: int, model_state_path: union.FlyteFile) -> int:
model = load_model(model_state_path)
return model(value)


@workflow
def run_inference(values: list[int] = list(range(20))) -> list[int]:
model_state = create_model_state()
inference_ = partial(inference, model_state_path=model_state)
return union.map_task(inference_)(value=values)
47 changes: 47 additions & 0 deletions user_guide/core_concepts/actors/byoc/caching_map_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from time import sleep
import os

import union

image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = union.ActorEnvironment(
name="my-actor",
container_image=image,
replica_count=1,
)


class MyObj:
def __init__(self, state: int):
self.state = state

def __hash__(self):
return hash(self.state)

def __eq__(self, other):
return self.state == other.state


@actor.cache
def get_state(obj: MyObj) -> int:
sleep(2)
return obj.state


@actor.task
def construct_and_get_value(state: int) -> int:
obj = MyObj(state=state)
return get_state(obj)


@union.workflow
def wf(state: int = 2) -> int:
value = construct_and_get_value(state=state)
value = construct_and_get_value(state=value)
value = construct_and_get_value(state=value)
value = construct_and_get_value(state=value)
return value
9 changes: 4 additions & 5 deletions user_guide/core_concepts/actors/byoc/hello_world.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import os

from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
import union

image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = ActorEnvironment(
actor = union.ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(
requests=union.Resources(
cpu="2",
mem="300Mi",
),
Expand All @@ -25,6 +24,6 @@ def say_hello() -> str:
return "hello"


@workflow
@union.workflow
def wf():
say_hello()
16 changes: 7 additions & 9 deletions user_guide/core_concepts/actors/byoc/multiple_tasks.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import os

from flytekit import current_context, workflow, LaunchPlan, Resources, ImageSpec
from union.actor import ActorEnvironment
import union

image = ImageSpec(
image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = ActorEnvironment(
actor = union.ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(cpu="1", mem="450Mi"),
requests=union.Resources(cpu="1", mem="450Mi"),
container_image=image,
)

Expand All @@ -27,16 +26,15 @@ def scream_hello(name: str) -> str:
return f"HELLO {name}"


@workflow
@union.workflow
def my_child_wf(name: str) -> str:
return scream_hello(name=name)


my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(),
my_child_wf)
my_child_wf_lp = union.LaunchPlan.get_default_launch_plan(union.current_context(), my_child_wf)


@workflow
@union.workflow
def my_parent_wf(name: str) -> str:
a = say_hello(name=name)
b = my_child_wf(name=a)
Expand Down
11 changes: 5 additions & 6 deletions user_guide/core_concepts/actors/byoc/plus_one.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import os

from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
import union

image = ImageSpec(
image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)

actor = ActorEnvironment(
actor = union.ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=300,
requests=Resources(cpu="2", mem="500Mi"),
requests=union.Resources(cpu="2", mem="500Mi"),
container_image=image,
)

Expand All @@ -22,7 +21,7 @@ def plus_one(input: int) -> int:
return input + 1


@workflow
@union.workflow
def wf(input: int = 0) -> int:
a = plus_one(input=input)
b = plus_one(input=a)
Expand Down
11 changes: 5 additions & 6 deletions user_guide/core_concepts/actors/pod_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
V1ResourceRequirements,
V1EnvVar,
)
from flytekit import workflow, ImageSpec, PodTemplate
from union.actor import ActorEnvironment
import union

image = ImageSpec(
image = union.ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union", "flytekitplugins-pod"],
)

pod_template = PodTemplate(
pod_template = union.PodTemplate(
primary_container_name="primary",
pod_spec=V1PodSpec(
containers=[
Expand All @@ -36,7 +35,7 @@
),
)

actor = ActorEnvironment(
actor = union.ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
Expand All @@ -54,6 +53,6 @@ def check_set() -> str:
return os.getenv("RUN_KEY_EX")


@workflow
@union.workflow
def wf() -> tuple[str,str]:
return get_and_set(), check_set()
30 changes: 30 additions & 0 deletions user_guide/core_concepts/actors/serverless/caching_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from time import sleep

import union


actor = union.ActorEnvironment(
name="my-actor",
replica_count=1,
)


@actor.cache
def load_model(state: int) -> callable:
sleep(4) # simulate model loading
return lambda value: state + value


@actor.task
def evaluate(value: int, state: int) -> int:
model = load_model(state=state)
return model(value)


@union.workflow
def wf(init_value: int = 1, state: int = 3) -> int:
out = evaluate(value=init_value, state=state)
out = evaluate(value=out, state=state)
out = evaluate(value=out, state=state)
out = evaluate(value=out, state=state)
return out
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from functools import partial
from pathlib import Path
from time import sleep

import union

actor = union.ActorEnvironment(
name="my-actor",
replica_count=2,
)


class MyModel:
"""Simple model that multiples value with model_state."""

def __init__(self, model_state: int):
self.model_state = model_state

def __call__(self, value: int):
return self.model_state * value


@task(container_image=image, cache=True, cache_version="v1")
def create_model_state() -> union.FlyteFile:
working_dir = Path(union.current_context().working_directory)
model_state_path = working_dir / "model_state.txt"
model_state_path.write_text("4")
return model_state_path


@actor.cache
def load_model(model_state_path: union.FlyteFile) -> MyModel:
# Simulate model loading time. This can take a long time
# because the FlyteFile download is large, or when the
# model is loaded onto the GPU.
sleep(10)
with model_state_path.open("r") as f:
model_state = int(f.read())

return MyModel(model_state=model_state)


@actor.task
def inference(value: int, model_state_path: union.FlyteFile) -> int:
model = load_model(model_state_path)
return model(value)


@workflow
def run_inference(values: list[int] = list(range(20))) -> list[int]:
model_state = create_model_state()
inference_ = partial(inference, model_state_path=model_state)
return union.map_task(inference_)(value=values)
Loading

0 comments on commit 8d69049

Please sign in to comment.