Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Plugin Service (grpc) #1524

Merged
merged 58 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2437e6a
wip
kumare3 Oct 13, 2022
063dd3b
updated
kumare3 Oct 14, 2022
cd14658
Add bq plugin and refactor
pingsutw Jan 21, 2023
93a4ed2
Add dummy plugin for performance testing
pingsutw Jan 24, 2023
a2a5305
nit
pingsutw Jan 24, 2023
f6b0d81
nit
pingsutw Jan 25, 2023
84ffbfe
test
pingsutw Jan 25, 2023
b39fe48
test
pingsutw Jan 25, 2023
8ba641c
test
pingsutw Feb 16, 2023
bda6432
test
pingsutw Feb 18, 2023
609f852
wip
pingsutw Feb 22, 2023
c06662d
wip
pingsutw Feb 23, 2023
625548b
Add grpc server
pingsutw Feb 23, 2023
c446900
nit
pingsutw Feb 24, 2023
787031e
nit
pingsutw Feb 24, 2023
996552c
grpc plugin
pingsutw Feb 24, 2023
ce60f20
nit
pingsutw Feb 24, 2023
18af9ac
nit
pingsutw Feb 24, 2023
047b7f1
nit
pingsutw Feb 24, 2023
cf9cf3e
nit
pingsutw Feb 25, 2023
82c048f
grpc server
pingsutw Feb 27, 2023
19c198c
grpc server
pingsutw Feb 27, 2023
d66c3f8
clean up
pingsutw Feb 27, 2023
5f0084f
nit
pingsutw Feb 27, 2023
2edc620
test
pingsutw Feb 27, 2023
fd2b9b3
nit
pingsutw Mar 1, 2023
e71b6f6
update port
pingsutw Mar 13, 2023
2b76331
wip
pingsutw Mar 13, 2023
2f598b2
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 13, 2023
c342d37
nit
pingsutw Mar 13, 2023
ee4a180
update port
pingsutw Mar 14, 2023
1a908c4
update
pingsutw Mar 14, 2023
764c0f5
update get request
pingsutw Mar 14, 2023
f402d57
more tets
pingsutw Mar 14, 2023
bc30f51
remove prev state
pingsutw Mar 14, 2023
1c16952
nit
pingsutw Mar 14, 2023
f044e28
error handling
pingsutw Mar 27, 2023
8385e02
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 27, 2023
1dd716d
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Mar 31, 2023
59714f9
wip
pingsutw Mar 31, 2023
26eab42
wip
pingsutw Mar 31, 2023
4ee1417
fixed test
pingsutw Mar 31, 2023
a70c12e
fixed test
pingsutw Mar 31, 2023
dbd26b5
more tests
pingsutw Apr 9, 2023
2c1cce8
more tests
pingsutw Apr 9, 2023
5a2bdc4
more tests
pingsutw Apr 9, 2023
1fba9d4
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw Apr 9, 2023
ae8c37e
lint
pingsutw Apr 9, 2023
0b151cf
nit
pingsutw Apr 12, 2023
9f8337d
lint
pingsutw Apr 13, 2023
4b92275
nit
pingsutw Apr 13, 2023
f28183e
merged master
pingsutw Apr 21, 2023
e07c72d
update
pingsutw May 4, 2023
9cadb80
merged master
pingsutw May 5, 2023
0357806
update
pingsutw May 5, 2023
f594df9
nit
pingsutw May 5, 2023
c524560
port
pingsutw May 5, 2023
f11dd2b
Merge branch 'master' of github.com:flyteorg/flytekit into backend-pl…
pingsutw May 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,48 @@ jobs:
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-external-plugin-service-images:
runs-on: ubuntu-latest
needs: deploy
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1
- name: Login to GitHub Container Registry
if: ${{ github.event_name == 'release' }}
uses: docker/login-action@v1
with:
registry: ghcr.io
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
- name: Prepare External Plugin Service Image Names
id: external-plugin-service-names
uses: docker/metadata-action@v3
with:
images: |
ghcr.io/${{ github.repository_owner }}/flytekit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make a new package for this. don't want to confuse people with the existing flytekit base image right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed it. flytekit-external-plugin-service:latest -> external-plugin-service:latest

tags: |
external-plugin-service-latest
external-plugin-service-${{ github.sha }}
external-plugin-service-${{ needs.deploy.outputs.version }}
- name: Push External Plugin Service Image to GitHub Registry
uses: docker/build-push-action@v2
with:
context: "."
platforms: linux/arm64, linux/amd64
push: ${{ github.event_name == 'release' }}
tags: ${{ steps.external-plugin-service-names.outputs.tags }}
build-args: |
VERSION=${{ needs.deploy.outputs.version }}
file: ./Dockerfile
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-spark-images:
runs-on: ubuntu-latest
needs: deploy
Expand Down
10 changes: 10 additions & 0 deletions Dockerfile.external-plugin-service
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.9-slim-buster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we build one for each python version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a default version first because it's experimental feature. we can add more version when we need it


MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

ARG VERSION
RUN pip install -U flytekit==$VERSION \
flytekitplugins-bigquery==$VERSION \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is bigquery special?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I add a new backend plugin for BQ in this pr


CMD pyflyte serve --port 80
2 changes: 1 addition & 1 deletion doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ flask==2.2.3
# via mlflow
flatbuffers==23.1.21
# via tensorflow
flyteidl==1.3.12
flyteidl==1.3.16
# via flytekit
fonttools==4.38.0
# via matplotlib
Expand Down
2 changes: 1 addition & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ def get_upload_signed_url(

def get_download_signed_url(
self, native_url: str, expires_in: datetime.timedelta = None
) -> _data_proxy_pb2.CreateUploadLocationResponse:
) -> _data_proxy_pb2.CreateDownloadLocationRequest:
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
Expand Down
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from flytekit.clis.sdk_in_container.register import register
from flytekit.clis.sdk_in_container.run import run
from flytekit.clis.sdk_in_container.serialize import serialize
from flytekit.clis.sdk_in_container.serve import serve
from flytekit.configuration.internal import LocalSDK
from flytekit.exceptions.base import FlyteException
from flytekit.exceptions.user import FlyteInvalidInputException
Expand Down Expand Up @@ -134,6 +135,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(run)
main.add_command(register)
main.add_command(backfill)
main.add_command(serve)
main.add_command(build)
main.add_command(launchplan)
main.epilog
Expand Down
46 changes: 46 additions & 0 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from concurrent import futures

import click
import grpc
from flyteidl.service.external_plugin_service_pb2_grpc import add_ExternalPluginServiceServicer_to_server

from flytekit.extend.backend.external_plugin_service import BackendPluginServer

_serve_help = """Start a grpc server for the external plugin service."""


@click.command("serve", help=_serve_help)
@click.option(
"--port",
default="80",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a bit too over-reaching? should we do 30087? 30090? the existing ports are https://github.com/flyteorg/flytectl/blob/bd6b85605f49ff4877484edd69942ca72f4e19dd/pkg/docker/docker_util.go#L135

Copy link
Member Author

@pingsutw pingsutw May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

admin also uses 80, maybe set nodeport to 30090

flyteadmin                   ClusterIP   10.96.119.241   <none>        80/TCP,81/TCP,87/TCP,10254/TCP             2d

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the pod port, right? Having this as port 80 might be annoying in local tests.

Also, this will be exposed through a Service, which we're doing in flyteorg/flyte#3454. Reading that PR though I'm don't get why we need to use NodePort services. Isn't the external plugin service accessed only by propeller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with nodeport, people are able to submit the job from local. only for testing.

is_flag=False,
type=int,
help="Grpc port for the external plugin service",
)
@click.option(
"--worker",
default="10",
is_flag=False,
type=int,
help="Number of workers for the grpc server",
)
@click.option(
"--timeout",
default=None,
is_flag=False,
type=int,
help="It will wait for the specified number of seconds before shutting down grpc server. It should only be used "
"for testing.",
)
@click.pass_context
def serve(_: click.Context, port, worker, timeout):
"""
Start a grpc server for the external plugin service.
"""
click.secho("Starting the external plugin service...", fg="blue")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker))
add_ExternalPluginServiceServicer_to_server(BackendPluginServer(), server)

server.add_insecure_port(f"[::]:{port}")
server.start()
server.wait_for_termination(timeout=timeout)
Empty file.
66 changes: 66 additions & 0 deletions flytekit/extend/backend/base_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing
from abc import ABC, abstractmethod

import grpc
from flyteidl.core.tasks_pb2 import TaskTemplate
from flyteidl.service.external_plugin_service_pb2 import (
RETRYABLE_FAILURE,
RUNNING,
SUCCEEDED,
State,
TaskCreateResponse,
TaskDeleteResponse,
TaskGetResponse,
)

from flytekit.models.literals import LiteralMap


class BackendPluginBase(ABC):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, task_type: str):
self._task_type = task_type

@property
def task_type(self) -> str:
return self._task_type

@abstractmethod
def create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
) -> TaskCreateResponse:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like the fact that this is supposed to return a unique job id.


@abstractmethod
def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse:
pass

@abstractmethod
def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse:
pass


class BackendPluginRegistry(object):
_REGISTRY: typing.Dict[str, BackendPluginBase] = {}

@staticmethod
def register(plugin: BackendPluginBase):
BackendPluginRegistry._REGISTRY[plugin.task_type] = plugin
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def get_plugin(task_type: str) -> BackendPluginBase:
return BackendPluginRegistry._REGISTRY[task_type]


def convert_to_flyte_state(state: str) -> State:
state = state.lower()
if state in ["failed"]:
return RETRYABLE_FAILURE
elif state in ["done", "succeeded"]:
return SUCCEEDED
elif state in ["running"]:
return RUNNING
raise ValueError(f"Unrecognized state: {state}")
42 changes: 42 additions & 0 deletions flytekit/extend/backend/external_plugin_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import grpc
from flyteidl.service.external_plugin_service_pb2 import (
TaskCreateRequest,
TaskCreateResponse,
TaskDeleteRequest,
TaskDeleteResponse,
TaskGetRequest,
TaskGetResponse,
)
from flyteidl.service.external_plugin_service_pb2_grpc import ExternalPluginServiceServicer

from flytekit.extend.backend import model
from flytekit.extend.backend.base_plugin import BackendPluginRegistry


class BackendPluginServer(ExternalPluginServiceServicer):
def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) -> TaskCreateResponse:
try:
req = model.TaskCreateRequest.from_flyte_idl(request)
plugin = BackendPluginRegistry.get_plugin(req.template.type)
return plugin.create(
context=context, inputs=req.inputs, output_prefix=req.output_prefix, task_template=req.template
)
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> TaskGetResponse:
try:
plugin = BackendPluginRegistry.get_plugin(request.task_type)
return plugin.get(context=context, job_id=request.job_id)
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to get task with error {e}")

def DeleteTask(self, request: TaskDeleteRequest, context: grpc.ServicerContext) -> TaskDeleteResponse:
try:
plugin = BackendPluginRegistry.get_plugin(request.task_type)
return plugin.delete(context=context, job_id=request.job_id)
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")
40 changes: 40 additions & 0 deletions flytekit/extend/backend/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Optional

from flyteidl.service import external_plugin_service_pb2

from flytekit.models import common, task
from flytekit.models.literals import LiteralMap


class TaskCreateRequest(common.FlyteIdlEntity):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my vote would be to get rid of this wrapper class and use the underlying IDL object directly. I started doing that once the pyi files were in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it

def __init__(self, output_prefix: str, template: task.TaskTemplate, inputs: Optional[LiteralMap] = None):
self._output_prefix = output_prefix
self._template = template
self._inputs = inputs

@property
def output_prefix(self) -> str:
return self._output_prefix

@property
def template(self) -> task.TaskTemplate:
return self._template

@property
def inputs(self) -> Optional[LiteralMap]:
return self._inputs

def to_flyte_idl(self) -> external_plugin_service_pb2.TaskCreateRequest:
return external_plugin_service_pb2.TaskCreateRequest(
output_prefix=self.output_prefix,
template=self.template.to_flyte_idl(),
inputs=self.inputs.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, proto):
return cls(
output_prefix=proto.output_prefix,
template=task.TaskTemplate.from_flyte_idl(proto.template),
inputs=LiteralMap.from_flyte_idl(proto.inputs) if proto.inputs is not None else None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
BigQueryTask
"""

from .backend_plugin import BigQueryPlugin
from .task import BigQueryConfig, BigQueryTask
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import datetime
from typing import Dict, Optional

import grpc
from flyteidl.service.external_plugin_service_pb2 import (
SUCCEEDED,
TaskCreateResponse,
TaskDeleteResponse,
TaskGetResponse,
)
from google.cloud import bigquery

from flytekit import FlyteContextManager, StructuredDataset
from flytekit.core.type_engine import TypeEngine
from flytekit.extend.backend.base_plugin import BackendPluginBase, BackendPluginRegistry, convert_to_flyte_state
from flytekit.models import literals
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.models.types import LiteralType, StructuredDatasetType

pythonTypeToBigQueryType: Dict[type, str] = {
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes
list: "ARRAY",
bool: "BOOL",
bytes: "BYTES",
datetime.datetime: "DATETIME",
float: "FLOAT64",
int: "INT64",
str: "STRING",
}


class BigQueryPlugin(BackendPluginBase):
def __init__(self):
super().__init__(task_type="bigquery_query_job_task")

def create(
self,
context: grpc.ServicerContext,
output_prefix: str,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
) -> TaskCreateResponse:
job_config = None
if inputs:
ctx = FlyteContextManager.current_context()
python_interface_inputs = {
name: TypeEngine.guess_python_type(lt.type) for name, lt in task_template.interface.inputs.items()
}
native_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, python_interface_inputs)

job_config = bigquery.QueryJobConfig(
query_parameters=[
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
bigquery.ScalarQueryParameter(name, pythonTypeToBigQueryType[python_interface_inputs[name]], val)
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
for name, val in native_inputs.items()
]
)

custom = task_template.custom
client = bigquery.Client(project=custom["ProjectID"], location=custom["Location"])
query_job = client.query(task_template.sql.statement, job_config=job_config)

return TaskCreateResponse(job_id=str(query_job.job_id))

def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse:
client = bigquery.Client()
job = client.get_job(job_id)
cur_state = convert_to_flyte_state(str(job.state))
res = None

if cur_state == SUCCEEDED:
ctx = FlyteContextManager.current_context()
output_location = f"bq://{job.destination.project}:{job.destination.dataset_id}.{job.destination.table_id}"
res = literals.LiteralMap(
{
"results": TypeEngine.to_literal(
ctx,
StructuredDataset(uri=output_location),
StructuredDataset,
LiteralType(structured_dataset_type=StructuredDatasetType(format="")),
)
}
)

return TaskGetResponse(state=cur_state, outputs=res.to_flyte_idl())

def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse:
client = bigquery.Client()
client.cancel_job(job_id)
return TaskDeleteResponse()


BackendPluginRegistry.register(BigQueryPlugin())
3 changes: 3 additions & 0 deletions plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
return sql

def execute(self, **kwargs) -> Any:
raise Exception("Cannot run a SQL Task natively, please mock.")
Loading