Skip to content

Commit

Permalink
Add make targets to build and register workflows (#7)
Browse files Browse the repository at this point in the history
- [X] Add make targets to register sandbox, staging & production workflows
- [X] Add docker build script
- [X] Add dynamic workflow examples with retries and failure ratios
  • Loading branch information
EngHabu authored Feb 7, 2020
1 parent 99544df commit decddd1
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 12 deletions.
4 changes: 2 additions & 2 deletions flytetools/flytetester/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ COPY . /root
RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/flytekit_venv
RUN chmod a+x /usr/local/bin/flytekit_venv

ARG tag
ARG DOCKER_IMAGE

# Environment variables we want to bake into the image
# These are configuration options that can also be specified in a file. See flytekit.config as an examples

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ENV FLYTE_INTERNAL_IMAGE $tag
ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
51 changes: 48 additions & 3 deletions flytetools/flytetester/Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,60 @@
export IMAGE_NAME=flytetester
VERSION=$(shell ./version.sh)
PROJECT=flytetester
DOMAIN=development

serialize:
pyflyte -p ${PROJECT} -d ${DOMAIN} serialize workflows

register:
pyflyte -p ${PROJECT} -d ${DOMAIN} register workflows

execute:
pyflyte -p ${PROJECT} -d ${DOMAIN} lp execute app-workflows-work-one-python-task-w-f

.PHONY: register_staging_in_container
register_staging_in_container:
pyflyte -p ${PROJECT} -d ${DOMAIN} --config /root/staging.config register workflows


.PHONY: register_staging
register_staging: docker_push
docker run -e FLYTE_CREDENTIALS_CLIENT_ID=${FLYTE_CREDENTIALS_CLIENT_ID} \
-e FLYTE_CREDENTIALS_CLIENT_SECRET=${FLYTE_CREDENTIALS_CLIENT_SECRET} \
-e FLYTE_CREDENTIALS_AUTH_MODE=basic -e FLYTE_CREDENTIALS_AUTHORIZATION_METADATA_KEY=flyte-authorization \
-e FLYTE_CREDENTIALS_SCOPE=svc -e FLYTE_PLATFORM_AUTH=True \
docker.io/lyft/${IMAGE_NAME}:${VERSION} /usr/local/bin/flytekit_venv make register_staging_in_container

.PHONY: register_production_in_container
register_production_in_container:
pyflyte -p ${PROJECT} -d ${DOMAIN} --config /root/production.config register workflows

.PHONY: register_production
register_production: docker_push
docker run -e FLYTE_CREDENTIALS_CLIENT_ID=${FLYTE_CREDENTIALS_CLIENT_ID} \
-e FLYTE_CREDENTIALS_CLIENT_SECRET=${FLYTE_CREDENTIALS_CLIENT_SECRET} \
-e FLYTE_CREDENTIALS_AUTH_MODE=basic -e FLYTE_CREDENTIALS_AUTHORIZATION_METADATA_KEY=flyte-authorization \
-e FLYTE_CREDENTIALS_SCOPE=svc -e FLYTE_PLATFORM_AUTH=True \
docker.io/lyft/${IMAGE_NAME}:${VERSION} /usr/local/bin/flytekit_venv make register_production_in_container

.PHONY: register_production_in_container
register_sandbox_in_container:
pyflyte -p ${PROJECT} -d ${DOMAIN} --config /root/sandbox.config register workflows

.PHONY: register_production
register_sandbox: docker_push
docker run docker.io/lyft/${IMAGE_NAME}:${VERSION} /usr/local/bin/flytekit_venv make register_sandbox_in_container

.PHONY: end2end
end2end_test:
flytekit_venv end2end/run.sh

.PHONY: docker_build
docker_build:
scripts/docker_build.sh

.PHONY: docker_push
docker_push:
REGISTRY=docker.io/lyft scripts/docker_build.sh

.PHONY: docker_build_push
docker_build_push:
docker_build
docker_push
96 changes: 96 additions & 0 deletions flytetools/flytetester/app/workflows/dynamic_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import absolute_import
from __future__ import print_function

import time
from flytekit.sdk.tasks import (
python_task,
dynamic_task,
inputs,
outputs,
)
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input
from six import moves as _six_moves


# Uncacheable
@inputs(wf_input=Types.Float, cache_disabled=Types.Boolean)
@outputs(generated=Types.Float)
@python_task
def generate_input(wf_params, wf_input, cache_disabled, generated):
if cache_disabled:
generated.set(time.time())
else:
generated.set(wf_input)


@inputs(caching_input=Types.Float)
@outputs(out_ints=[Types.Integer])
@dynamic_task(cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi")
def sample_batch_task_sq(wf_params, caching_input, out_ints):
res2 = []
for i in _six_moves.range(0, 3):
task = sq_sub_task(in1=i)
yield task
res2.append(task.outputs.out1)
out_ints.set(res2)


@inputs(caching_input=Types.Float)
@outputs(out_str=[Types.String], out_ints=[[Types.Integer]])
@dynamic_task(cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi")
def sample_batch_task_cachable(wf_params, caching_input, out_str, out_ints):
res = ["I'm the first result"]
for i in _six_moves.range(0, 3):
task = sub_task(in1=i)
yield task
res.append(task.outputs.out1)
res.append("I'm after each sub-task result")
res.append("I'm the last result")

res2 = []
for i in _six_moves.range(0, 3):
task = int_sub_task(in1=i)
yield task
res2.append(task.outputs.out1)

# Nested batch tasks
task = sample_batch_task_sq()
yield task
res2.append(task.outputs.out_ints)

task = sample_batch_task_sq()
yield task
res2.append(task.outputs.out_ints)

out_str.set(res)
out_ints.set(res2)


@inputs(caching_input=Types.Float, in1=Types.Integer)
@outputs(out1=Types.String)
@python_task(cache=True, cache_version='1.0', cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi")
def sub_task(wf_params, caching_input, in1, out1):
out1.set("hello {}".format(in1))


@inputs(caching_input=Types.Float, in1=Types.Integer)
@outputs(out1=[Types.Integer])
@python_task(cache=True, cache_version='1.0',cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi")
def int_sub_task(wf_params, caching_input, in1, out1):
out1.set([in1, in1 * 2, in1 * 3])


@inputs(caching_input=Types.Float, in1=Types.Integer)
@outputs(out1=Types.Integer)
@python_task(cache=True, cache_version='1.0',cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi")
def sq_sub_task(wf_params, caching_input, in1, out1):
out1.set(in1 * in1)


@workflow_class
class OptionallyCachableWorkflow(object):
input_if_cached_enabled = Input(Types.Float, default=10.0, help="Test float input with default")
cache_disabled = Input(Types.Boolean, default=False, help="Whether to disable cache.")
input_generator = generate_input(wf_input=input_if_cached_enabled, cache_disabled=cache_disabled)
dynamic_task = sample_batch_task_cachable(caching_input=input_generator.outputs.generated)
42 changes: 42 additions & 0 deletions flytetools/flytetester/app/workflows/failure_ratio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import
from __future__ import print_function

from flytekit.common.exceptions.base import FlyteRecoverableException
from flytekit.common.types.containers import List
from flytekit.sdk.tasks import (
dynamic_task,
python_task, inputs, outputs)
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output


@outputs(out_int=Types.Integer)
@inputs(idx=Types.Integer)
@python_task
def fail_if_4_5(wf_params, idx, out_int):
if idx == 4 or idx == 5:
wf_params.logging.info("This task will fail")
raise FlyteRecoverableException('This task is supposed to fail')

wf_params.logging.info("This task should succeed")
out_int.set(idx)


@outputs(out_ints=[Types.Integer])
@dynamic_task(cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi", retries=2,
allowed_failure_ratio=0.3)
def failure_ratio_task_gen(wf_params, out_ints):
res = []
for i in range(0, 10):
t = fail_if_4_5(idx=i)
yield t
res.append(t.outputs.out_int)

out_ints.set(res)


@workflow_class
class FailureRatioWorkflow(object):
dynamic_task = failure_ratio_task_gen()

out_ints = Output(dynamic_task.outputs.out_ints, List(Types.Integer), help="The output ints.")
24 changes: 24 additions & 0 deletions flytetools/flytetester/app/workflows/retryable_dynamic_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import absolute_import
from __future__ import print_function

from app.workflows.failing_workflows import retryer
from flytekit.sdk.tasks import (
dynamic_task,
)
from flytekit.sdk.workflow import workflow_class


@dynamic_task(cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi", retries=2)
def sample_dynamic_task(wf_params):
yield retryer()
yield retryer()


@dynamic_task(cpu_request="200m", cpu_limit="200m", memory_request="500Mi", memory_limit="500Mi", retries=2)
def sample_dynamic_task_recursive(wf_params):
yield sample_dynamic_task()


@workflow_class
class RetryableDynamicWorkflow(object):
dynamic_task = sample_dynamic_task_recursive()
10 changes: 4 additions & 6 deletions flytetools/flytetester/end2end/end2end.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ workflow_packages=app.workflows
python_venv=flytekit_venv

[auth]
assumable_iam_role=arn:aws:iam::123456789101:role/flytesandboxtests-development
assumable_iam_role=arn:aws:iam::173840052742:role/modelbuilderapibatchworker-development

[platform]
url=flyteadmin:81
insecure=True
url=flyte-staging.lyft.net

[aws]
endpoint=http://minio:9000
access_key_id=minio
secret_access_key=miniostorage
s3_shard_formatter=s3://lyft-modelbuilder/{}/
s3_shard_string_length=2
13 changes: 13 additions & 0 deletions flytetools/flytetester/production.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[sdk]
workflow_packages=app.workflows
python_venv=flytekit_venv

[auth]
assumable_iam_role=arn:aws:iam::173840052742:role/modelbuilderapibatchworker-development

[platform]
url=flyte.lyft.net

[aws]
s3_shard_formatter=s3://lyft-modelbuilder/{}/
s3_shard_string_length=2
2 changes: 1 addition & 1 deletion flytetools/flytetester/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
flytekit[sidecar,schema]==0.3.0
flytekit[sidecar,schema]==0.4.4
statsd
opencv-python==3.4.4.19
k8s-proto>=0.0.2
14 changes: 14 additions & 0 deletions flytetools/flytetester/sandbox.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[sdk]
workflow_packages=app.workflows
python_venv=flytekit_venv

[auth]
assumable_iam_role=arn:aws:iam::173840052742:role/modelbuilderapibatchworker-development

[platform]
url=host.docker.internal:30081
insecure=True

[aws]
s3_shard_formatter=s3://lyft-modelbuilder/{}/
s3_shard_string_length=2
63 changes: 63 additions & 0 deletions flytetools/flytetester/scripts/docker_build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env bash

# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES.
# ONLY EDIT THIS FILE FROM WITHIN THE 'LYFT/BOILERPLATE' REPOSITORY:
#
# TO OPT OUT OF UPDATES, SEE https://github.com/lyft/boilerplate/blob/master/Readme.rst

set -e

echo ""
echo "------------------------------------"
echo " DOCKER BUILD"
echo "------------------------------------"
echo ""

if [ -n "$REGISTRY" ]; then
# Do not push if there are unstaged git changes
CHANGED=$(git status --porcelain)
if [ -n "$CHANGED" ]; then
echo "Please commit git changes before pushing to a registry"
exit 1
fi
fi


GIT_SHA=$(git rev-parse HEAD)

IMAGE_TAG_WITH_SHA="${IMAGE_NAME}:${GIT_SHA}"

RELEASE_SEMVER=$(git describe --tags --exact-match "$GIT_SHA" 2>/dev/null) || true
if [ -n "$RELEASE_SEMVER" ]; then
IMAGE_TAG_WITH_SEMVER="${IMAGE_NAME}:${RELEASE_SEMVER}${IMAGE_TAG_SUFFIX}"
fi

# if REGISTRY specified, push the images to the remote registy
if [ -n "$REGISTRY" ]; then

# build the image
docker build -t "$IMAGE_TAG_WITH_SHA" --build-arg DOCKER_IMAGE="${REGISTRY}/${IMAGE_TAG_WITH_SHA}" .
echo "${IMAGE_TAG_WITH_SHA} built locally."

if [ -n "${DOCKER_REGISTRY_PASSWORD}" ]; then
docker login --username="$DOCKER_REGISTRY_USERNAME" --password="$DOCKER_REGISTRY_PASSWORD"
fi

docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SHA}"

docker push "${REGISTRY}/${IMAGE_TAG_WITH_SHA}"
echo "${REGISTRY}/${IMAGE_TAG_WITH_SHA} pushed to remote."

# If the current commit has a semver tag, also push the images with the semver tag
if [ -n "$RELEASE_SEMVER" ]; then

docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}"

docker push "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}"
echo "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER} pushed to remote."
fi
else
# build the image
docker build -t "$IMAGE_TAG_WITH_SHA" --build-arg DOCKER_IMAGE="${REGISTRY}/${IMAGE_TAG_WITH_SHA}" .
echo "${IMAGE_TAG_WITH_SHA} built locally."
fi
13 changes: 13 additions & 0 deletions flytetools/flytetester/staging.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[sdk]
workflow_packages=app.workflows
python_venv=flytekit_venv

[auth]
assumable_iam_role=arn:aws:iam::173840052742:role/modelbuilderapibatchworker-development

[platform]
url=flyte-staging.lyft.net

[aws]
s3_shard_formatter=s3://lyft-modelbuilder/{}/
s3_shard_string_length=2
3 changes: 3 additions & 0 deletions flytetools/flytetester/version.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

echo $(git rev-parse HEAD)

0 comments on commit decddd1

Please sign in to comment.