Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Plugin Service (grpc) #1524

Merged
merged 58 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2437e6a
wip
kumare3 Oct 13, 2022
063dd3b
updated
kumare3 Oct 14, 2022
cd14658
Add bq plugin and refactor
pingsutw Jan 21, 2023
93a4ed2
Add dummy plugin for performance testing
pingsutw Jan 24, 2023
a2a5305
nit
pingsutw Jan 24, 2023
f6b0d81
nit
pingsutw Jan 25, 2023
84ffbfe
test
pingsutw Jan 25, 2023
b39fe48
test
pingsutw Jan 25, 2023
8ba641c
test
pingsutw Feb 16, 2023
bda6432
test
pingsutw Feb 18, 2023
609f852
wip
pingsutw Feb 22, 2023
c06662d
wip
pingsutw Feb 23, 2023
625548b
Add grpc server
pingsutw Feb 23, 2023
c446900
nit
pingsutw Feb 24, 2023
787031e
nit
pingsutw Feb 24, 2023
996552c
grpc plugin
pingsutw Feb 24, 2023
ce60f20
nit
pingsutw Feb 24, 2023
18af9ac
nit
pingsutw Feb 24, 2023
047b7f1
nit
pingsutw Feb 24, 2023
cf9cf3e
nit
pingsutw Feb 25, 2023
82c048f
grpc server
pingsutw Feb 27, 2023
19c198c
grpc server
pingsutw Feb 27, 2023
d66c3f8
clean up
pingsutw Feb 27, 2023
5f0084f
nit
pingsutw Feb 27, 2023
2edc620
test
pingsutw Feb 27, 2023
fd2b9b3
nit
pingsutw Mar 1, 2023
e71b6f6
update port
pingsutw Mar 13, 2023
2b76331
wip
pingsutw Mar 13, 2023
2f598b2
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 13, 2023
c342d37
nit
pingsutw Mar 13, 2023
ee4a180
update port
pingsutw Mar 14, 2023
1a908c4
update
pingsutw Mar 14, 2023
764c0f5
update get request
pingsutw Mar 14, 2023
f402d57
more tets
pingsutw Mar 14, 2023
bc30f51
remove prev state
pingsutw Mar 14, 2023
1c16952
nit
pingsutw Mar 14, 2023
f044e28
error handling
pingsutw Mar 27, 2023
8385e02
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 27, 2023
1dd716d
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 31, 2023
59714f9
wip
pingsutw Mar 31, 2023
26eab42
wip
pingsutw Mar 31, 2023
4ee1417
fixed test
pingsutw Mar 31, 2023
a70c12e
fixed test
pingsutw Mar 31, 2023
dbd26b5
more tests
pingsutw Apr 9, 2023
2c1cce8
more tests
pingsutw Apr 9, 2023
5a2bdc4
more tests
pingsutw Apr 9, 2023
1fba9d4
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Apr 9, 2023
ae8c37e
lint
pingsutw Apr 9, 2023
0b151cf
nit
pingsutw Apr 12, 2023
9f8337d
lint
pingsutw Apr 13, 2023
4b92275
nit
pingsutw Apr 13, 2023
f28183e
merged master
pingsutw Apr 21, 2023
e07c72d
update
pingsutw May 4, 2023
9cadb80
merged master
pingsutw May 5, 2023
0357806
update
pingsutw May 5, 2023
f594df9
nit
pingsutw May 5, 2023
c524560
port
pingsutw May 5, 2023
f11dd2b
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw May 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,48 @@ jobs:
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-external-plugin-service-images:
runs-on: ubuntu-latest
needs: deploy
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Login to GitHub Container Registry
if: ${{ github.event_name == 'release' }}
uses: docker/login-action@v1
with:
registry: ghcr.io
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
- name: Prepare External Plugin Service Image Names
id: external-plugin-service-names
uses: docker/metadata-action@v3
with:
images: |
ghcr.io/${{ github.repository_owner }}/external-plugin-service
tags: |
latest
${{ github.sha }}
${{ needs.deploy.outputs.version }}
- name: Push External Plugin Service Image to GitHub Registry
uses: docker/build-push-action@v2
with:
context: "."
platforms: linux/arm64, linux/amd64
push: ${{ github.event_name == 'release' }}
tags: ${{ steps.external-plugin-service-names.outputs.tags }}
build-args: |
VERSION=${{ needs.deploy.outputs.version }}
file: ./Dockerfile
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-spark-images:
runs-on: ubuntu-latest
needs: deploy
Expand Down
10 changes: 10 additions & 0 deletions Dockerfile.external-plugin-service
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.9-slim-buster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we build one for each python version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a default version first because it's experimental feature. we can add more version when we need it


MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

ARG VERSION
RUN pip install -U flytekit==$VERSION \
flytekitplugins-bigquery==$VERSION \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is bigquery special?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I add a new backend plugin for BQ in this pr


CMD pyflyte serve --port 8000
2 changes: 1 addition & 1 deletion doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ flask==2.2.3
# via mlflow
flatbuffers==23.1.21
# via tensorflow
flyteidl==1.3.12
flyteidl==1.3.16
# via flytekit
fonttools==4.38.0
# via matplotlib
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ def get_upload_signed_url(

def get_download_signed_url(
self, native_url: str, expires_in: datetime.timedelta = None
) -> _data_proxy_pb2.CreateUploadLocationResponse:
) -> _data_proxy_pb2.CreateDownloadLocationRequest:
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
Expand Down
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from flytekit.clis.sdk_in_container.register import register
from flytekit.clis.sdk_in_container.run import run
from flytekit.clis.sdk_in_container.serialize import serialize
from flytekit.clis.sdk_in_container.serve import serve
from flytekit.configuration.internal import LocalSDK
from flytekit.exceptions.base import FlyteException
from flytekit.exceptions.user import FlyteInvalidInputException
Expand Down Expand Up @@ -134,6 +135,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(run)
main.add_command(register)
main.add_command(backfill)
main.add_command(serve)
main.add_command(build)
main.add_command(launchplan)
main.epilog
Expand Down
46 changes: 46 additions & 0 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from concurrent import futures

import click
import grpc
from flyteidl.service.external_plugin_service_pb2_grpc import add_ExternalPluginServiceServicer_to_server

from flytekit.extend.backend.external_plugin_service import BackendPluginServer

_serve_help = """Start a grpc server for the external plugin service."""


@click.command("serve", help=_serve_help)
@click.option(
"--port",
default="8000",
is_flag=False,
type=int,
help="Grpc port for the external plugin service",
)
@click.option(
"--worker",
default="10",
is_flag=False,
type=int,
help="Number of workers for the grpc server",
)
@click.option(
"--timeout",
default=None,
is_flag=False,
type=int,
help="It will wait for the specified number of seconds before shutting down grpc server. It should only be used "
"for testing.",
)
@click.pass_context
def serve(_: click.Context, port, worker, timeout):
"""
Start a grpc server for the external plugin service.
"""
click.secho("Starting the external plugin service...", fg="blue")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker))
add_ExternalPluginServiceServicer_to_server(BackendPluginServer(), server)

server.add_insecure_port(f"[::]:{port}")
server.start()
server.wait_for_termination(timeout=timeout)
Empty file.
107 changes: 107 additions & 0 deletions flytekit/extend/backend/base_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import typing
from abc import ABC, abstractmethod

import grpc
from flyteidl.core.tasks_pb2 import TaskTemplate
from flyteidl.service.external_plugin_service_pb2 import (
RETRYABLE_FAILURE,
RUNNING,
SUCCEEDED,
State,
TaskCreateResponse,
TaskDeleteResponse,
TaskGetResponse,
)

from flytekit import logger
from flytekit.models.literals import LiteralMap


class BackendPluginBase(ABC):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"""
This is the base class for all backend plugins. It defines the interface that all plugins must implement.
The external plugins service will be run either locally or in a pod, and will be responsible for
invoking backend plugins. The propeller will communicate with the external plugins service
to create tasks, get the status of tasks, and delete tasks.

All the backend plugins should be registered in the BackendPluginRegistry. External plugins service
will look up the plugin based on the task type. Every task type can only have one plugin.
"""

def __init__(self, task_type: str):
self._task_type = task_type

@property
def task_type(self) -> str:
"""
task_type is the name of the task type that this plugin supports.
"""
return self._task_type

@abstractmethod
def create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
) -> TaskCreateResponse:
"""
Return a Unique ID for the task that was created. It should return error code if the task creation failed.
"""
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like the fact that this is supposed to return a unique job id.


@abstractmethod
def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse:
"""
Return the status of the task, and return the outputs in some cases. For example, bigquery job
can't write the structured dataset to the output location, so it returns the output literals to the propeller,
and the propeller will write the structured dataset to the blob store.
"""
pass

@abstractmethod
def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse:
"""
Delete the task. This call should be idempotent.
"""
pass


class BackendPluginRegistry(object):
"""
This is the registry for all backend plugins. The external plugins service will look up the plugin
based on the task type.
"""

_REGISTRY: typing.Dict[str, BackendPluginBase] = {}

@staticmethod
def register(plugin: BackendPluginBase):
if plugin.task_type in BackendPluginRegistry._REGISTRY:
raise ValueError(f"Duplicate plugin for task type {plugin.task_type}")
BackendPluginRegistry._REGISTRY[plugin.task_type] = plugin
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Registering backend plugin for task type {plugin.task_type}")

@staticmethod
def get_plugin(context: grpc.ServicerContext, task_type: str) -> typing.Optional[BackendPluginBase]:
if task_type not in BackendPluginRegistry._REGISTRY:
logger.error(f"Cannot find backend plugin for task type [{task_type}]")
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Cannot find backend plugin for task type [{task_type}]")
return None
return BackendPluginRegistry._REGISTRY[task_type]


def convert_to_flyte_state(state: str) -> State:
"""
Convert the state from the backend plugin to the state in flyte.
"""
state = state.lower()
if state in ["failed"]:
return RETRYABLE_FAILURE
elif state in ["done", "succeeded"]:
return SUCCEEDED
elif state in ["running"]:
return RUNNING
raise ValueError(f"Unrecognized state: {state}")
53 changes: 53 additions & 0 deletions flytekit/extend/backend/external_plugin_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import grpc
from flyteidl.service.external_plugin_service_pb2 import (
PERMANENT_FAILURE,
TaskCreateRequest,
TaskCreateResponse,
TaskDeleteRequest,
TaskDeleteResponse,
TaskGetRequest,
TaskGetResponse,
)
from flyteidl.service.external_plugin_service_pb2_grpc import ExternalPluginServiceServicer

from flytekit import logger
from flytekit.extend.backend.base_plugin import BackendPluginRegistry
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


class BackendPluginServer(ExternalPluginServiceServicer):
def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) -> TaskCreateResponse:
try:
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None
plugin = BackendPluginRegistry.get_plugin(context, tmp.type)
if plugin is None:
return TaskCreateResponse()
return plugin.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp)
except Exception as e:
logger.error(f"failed to create task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> TaskGetResponse:
try:
plugin = BackendPluginRegistry.get_plugin(context, request.task_type)
if plugin is None:
return TaskGetResponse(state=PERMANENT_FAILURE)
return plugin.get(context=context, job_id=request.job_id)
except Exception as e:
logger.error(f"failed to get task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to get task with error {e}")

def DeleteTask(self, request: TaskDeleteRequest, context: grpc.ServicerContext) -> TaskDeleteResponse:
try:
plugin = BackendPluginRegistry.get_plugin(context, request.task_type)
if plugin is None:
return TaskDeleteResponse()
return plugin.delete(context=context, job_id=request.job_id)
except Exception as e:
logger.error(f"failed to delete task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
BigQueryTask
"""

from .backend_plugin import BigQueryPlugin
from .task import BigQueryConfig, BigQueryTask
Loading