From b2ede7dd51786097f371c24ca9a422a60cdc3475 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 29 Oct 2024 11:45:54 -0500 Subject: [PATCH] [serve] Simplify application build codepath (#48218) Stacked on: https://github.com/ray-project/ray/pull/48223 Reimplements the `.bind()` codepath to avoid using the Ray DAG codepath which adds a lot of complexity for little benefit. The DAG traversal code is much simpler, around 100 lines of self-contained code in `build_app.py`. I've also added unit tests for it. There should be no behavior change here. --------- Signed-off-by: Edward Oakes --- python/ray/serve/_private/api.py | 10 +- .../ray/serve/_private/application_state.py | 15 +- python/ray/serve/_private/build_app.py | 164 ++++++++++ python/ray/serve/_private/client.py | 63 ++-- .../_private/deployment_function_node.py | 85 ------ .../serve/_private/deployment_graph_build.py | 217 -------------- .../serve/_private/deployment_scheduler.py | 21 +- python/ray/serve/_private/deployment_state.py | 10 + python/ray/serve/api.py | 53 +--- python/ray/serve/deployment.py | 61 +--- python/ray/serve/scripts.py | 10 +- python/ray/serve/tests/BUILD | 1 - python/ray/serve/tests/test_api.py | 37 +-- python/ray/serve/tests/test_deploy.py | 2 +- python/ray/serve/tests/test_deploy_2.py | 5 +- .../test_deployment_graph_handle_serde.py | 59 ---- .../ray/serve/tests/test_model_composition.py | 41 +-- python/ray/serve/tests/unit/test_build_app.py | 281 ++++++++++++++++++ .../serve/tests/unit/test_deployment_class.py | 42 --- 19 files changed, 573 insertions(+), 604 deletions(-) create mode 100644 python/ray/serve/_private/build_app.py delete mode 100644 python/ray/serve/_private/deployment_function_node.py delete mode 100644 python/ray/serve/_private/deployment_graph_build.py delete mode 100644 python/ray/serve/tests/test_deployment_graph_handle_serde.py create mode 100644 python/ray/serve/tests/unit/test_build_app.py diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 97641dc47f32b..954d71cac10d4 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -240,16 +240,16 @@ def serve_start( return client -def call_app_builder_with_args_if_necessary( +def call_user_app_builder_with_args_if_necessary( builder: Union[Application, FunctionType], args: Dict[str, Any], ) -> Application: - """Builds a Serve application from an application builder function. + """Calls a user-provided function that returns Serve application. - If a pre-built application is passed, this is a no-op. + If an Application object is passed, this is a no-op. - Else, we validate the signature of the builder, convert the args dictionary to - the user-annotated Pydantic model if provided, and call the builder function. + Else, we validate the signature of the function, convert the args dictionary to + the user-annotated Pydantic model if provided, and call the function. The output of the function is returned (must be an Application). """ diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index 55d273fd7cb46..572b265230d7e 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -12,6 +12,7 @@ from ray import cloudpickle from ray._private.utils import import_attr from ray.exceptions import RuntimeEnvSetupError +from ray.serve._private.build_app import BuiltApplication, build_app from ray.serve._private.common import ( DeploymentID, DeploymentStatus, @@ -1141,20 +1142,20 @@ def build_serve_application( ) try: - from ray.serve._private.api import call_app_builder_with_args_if_necessary - from ray.serve._private.deployment_graph_build import build as pipeline_build + from ray.serve._private.api import call_user_app_builder_with_args_if_necessary # Import and build the application. args_info_str = f" with arguments {args}" if args else "" logger.info(f"Importing application '{name}'{args_info_str}.") - app = call_app_builder_with_args_if_necessary(import_attr(import_path), args) - deployments = pipeline_build(app._get_internal_dag_node(), name) - ingress = deployments[-1] + app = call_user_app_builder_with_args_if_necessary( + import_attr(import_path), args + ) deploy_args_list = [] - for deployment in deployments: - is_ingress = deployment.name == ingress.name + built_app: BuiltApplication = build_app(app, name=name) + for deployment in built_app.deployments: + is_ingress = deployment.name == built_app.ingress_deployment_name deploy_args_list.append( get_deploy_args( name=deployment._name, diff --git a/python/ray/serve/_private/build_app.py b/python/ray/serve/_private/build_app.py new file mode 100644 index 0000000000000..6108dd8bc0d11 --- /dev/null +++ b/python/ray/serve/_private/build_app.py @@ -0,0 +1,164 @@ +import logging +from dataclasses import dataclass +from typing import Generic, List, TypeVar + +from ray.dag.py_obj_scanner import _PyObjScanner +from ray.serve._private.constants import SERVE_LOGGER_NAME +from ray.serve.deployment import Application, Deployment +from ray.serve.handle import DeploymentHandle + +logger = logging.getLogger(SERVE_LOGGER_NAME) + +K = TypeVar("K") +V = TypeVar("V") + + +class IDDict(dict, Generic[K, V]): + """Dictionary that uses id() for keys instead of hash(). + + This is necessary because Application objects aren't hashable and we want each + instance to map to a unique key. + """ + + def __getitem__(self, key: K) -> V: + return super().__getitem__(id(key)) + + def __setitem__(self, key: K, value: V): + return super().__setitem__(id(key), value) + + def __delitem__(self, key: K): + return super().__delitem__(id(key)) + + def __contains__(self, key: object): + return super().__contains__(id(key)) + + +@dataclass(frozen=True) +class BuiltApplication: + # Name of the application. + name: str + # Name of the application's 'ingress' deployment + # (the one exposed over gRPC/HTTP/handle). + ingress_deployment_name: str + # List of unique deployments comprising the app. + deployments: List[Deployment] + + +def build_app( + app: Application, + *, + name: str, +) -> BuiltApplication: + """Builds the application into a list of finalized deployments. + + The following transformations are made: + - Application objects in constructor args/kwargs are converted to + DeploymentHandles for injection at runtime. + - Name conflicts from deployments that use the same class are handled + by appending a monotonically increasing suffix (e.g., SomeClass_1). + + Returns: BuiltApplication + """ + deployments = _build_app_recursive( + app, + app_name=name, + handles=IDDict(), + deployment_names=IDDict(), + ) + return BuiltApplication( + name=name, + ingress_deployment_name=app._bound_deployment.name, + deployments=deployments, + ) + + +def _build_app_recursive( + app: Application, + *, + app_name: str, + deployment_names: IDDict[Application, str], + handles: IDDict[Application, DeploymentHandle], +) -> List[Deployment]: + """Recursively traverses the graph of Application objects. + + Each Application will have an associated DeploymentHandle created that will replace + it in any occurrences in other Applications' args or kwargs. + + Also collects a list of the unique Applications encountered and returns them as + deployable Deployment objects. + """ + # This application has already been encountered. + # There's no need to recurse into its child args and we don't want to create + # a duplicate entry for it in the list of deployments. + if app in handles: + return [] + + # Create the DeploymentHandle that will be used to replace this application + # in the arguments of its parent(s). + handles[app] = DeploymentHandle( + _get_unique_deployment_name_memoized(app, deployment_names), + app_name=app_name, + ) + + deployments = [] + scanner = _PyObjScanner(source_type=Application) + try: + # Recursively traverse any Application objects bound to init args/kwargs. + child_apps = scanner.find_nodes( + (app._bound_deployment.init_args, app._bound_deployment.init_kwargs) + ) + for child_app in child_apps: + deployments.extend( + _build_app_recursive( + child_app, + app_name=app_name, + handles=handles, + deployment_names=deployment_names, + ) + ) + + # Replace Application objects with their corresponding DeploymentHandles. + new_init_args, new_init_kwargs = scanner.replace_nodes(handles) + deployments.append( + app._bound_deployment.options( + name=_get_unique_deployment_name_memoized(app, deployment_names), + _init_args=new_init_args, + _init_kwargs=new_init_kwargs, + ) + ) + return deployments + finally: + scanner.clear() + + +def _get_unique_deployment_name_memoized( + app: Application, deployment_names: IDDict[Application, str] +) -> str: + """Generates a name for the deployment. + + This is used to handle collisions when the user does not specify a name + explicitly, so typically we'd use the class name as the default. + + In that case, we append a monotonically increasing suffix to the name, e.g., + Deployment, then Deployment_1, then Deployment_2, ... + + Names are memoized in the `deployment_names` dict, which should be passed to + subsequent calls to this function. + """ + if app in deployment_names: + return deployment_names[app] + + idx = 1 + name = app._bound_deployment.name + while name in deployment_names.values(): + name = f"{app._bound_deployment.name}_{idx}" + idx += 1 + + if idx != 1: + logger.warning( + "There are multiple deployments with the same name " + f"'{app._bound_deployment.name}'. Renaming one to '{name}'." + ) + + deployment_names[app] = name + return name diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index 31819a6f43acb..6741103380d10 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -7,6 +7,7 @@ import ray from ray.actor import ActorHandle from ray.serve._private.application_state import StatusOverview +from ray.serve._private.build_app import BuiltApplication from ray.serve._private.common import ( DeploymentID, DeploymentStatus, @@ -23,6 +24,7 @@ from ray.serve._private.controller import ServeController from ray.serve._private.deploy_utils import get_deploy_args from ray.serve._private.deployment_info import DeploymentInfo +from ray.serve._private.utils import get_random_string from ray.serve.config import HTTPOptions from ray.serve.exceptions import RayServeException from ray.serve.generated.serve_pb2 import DeploymentArgs, DeploymentRoute @@ -246,24 +248,26 @@ def _wait_for_application_running(self, name: str, timeout_s: int = -1): @_ensure_connected def deploy_application( self, - name, - deployments: List[Dict], - _blocking: bool = True, - ): - ingress_route_prefix = None + built_app: BuiltApplication, + *, + blocking: bool, + route_prefix: Optional[str], + logging_config: Optional[Union[Dict, LoggingConfig]], + ) -> DeploymentHandle: deployment_args_list = [] - for deployment in deployments: - if deployment["ingress"]: - ingress_route_prefix = deployment["route_prefix"] + for deployment in built_app.deployments: + if deployment.logging_config is None and logging_config: + deployment = deployment.options(logging_config=logging_config) + is_ingress = deployment.name == built_app.ingress_deployment_name deployment_args = get_deploy_args( - deployment["name"], - replica_config=deployment["replica_config"], - ingress=deployment["ingress"], - deployment_config=deployment["deployment_config"], - version=deployment["version"], - route_prefix=deployment["route_prefix"], - docs_path=deployment["docs_path"], + deployment.name, + ingress=is_ingress, + replica_config=deployment._replica_config, + deployment_config=deployment._deployment_config, + version=deployment._version or get_random_string(), + route_prefix=route_prefix if is_ingress else None, + docs_path=deployment._docs_path, ) deployment_args_proto = DeploymentArgs() @@ -283,14 +287,31 @@ def deploy_application( deployment_args_list.append(deployment_args_proto.SerializeToString()) - ray.get(self._controller.deploy_application.remote(name, deployment_args_list)) - if _blocking: - self._wait_for_application_running(name) - if ingress_route_prefix is not None: - url_part = " at " + self._root_url + ingress_route_prefix + ray.get( + self._controller.deploy_application.remote( + built_app.name, deployment_args_list + ) + ) + + # The deployment state is not guaranteed to be created after + # deploy_application returns; the application state manager will + # need another reconcile iteration to create it. + self._wait_for_deployment_created( + built_app.ingress_deployment_name, built_app.name + ) + handle = self.get_handle( + built_app.ingress_deployment_name, built_app.name, check_exists=False + ) + + if blocking: + self._wait_for_application_running(built_app.name) + if route_prefix is not None: + url_part = " at " + self._root_url + route_prefix else: url_part = "" - logger.info(f"Application '{name}' is ready{url_part}.") + logger.info(f"Application '{built_app.name}' is ready{url_part}.") + + return handle @_ensure_connected def deploy_apps( diff --git a/python/ray/serve/_private/deployment_function_node.py b/python/ray/serve/_private/deployment_function_node.py deleted file mode 100644 index 031faf910e46d..0000000000000 --- a/python/ray/serve/_private/deployment_function_node.py +++ /dev/null @@ -1,85 +0,0 @@ -from typing import Any, Callable, Dict, List, Union - -from ray.dag.dag_node import DAGNode -from ray.dag.format_utils import get_dag_node_str -from ray.serve._private.config import DeploymentConfig, ReplicaConfig -from ray.serve.deployment import Deployment, schema_to_deployment -from ray.serve.handle import DeploymentHandle -from ray.serve.schema import DeploymentSchema - - -class DeploymentFunctionNode(DAGNode): - """Represents a function node decorated by @serve.deployment in a serve DAG.""" - - def __init__( - self, - func_body: Union[Callable, str], - deployment_name, - app_name, - func_args, - func_kwargs, - func_options, - other_args_to_resolve=None, - ): - self._body = func_body - self._deployment_name = deployment_name - self._app_name = app_name - super().__init__( - func_args, - func_kwargs, - func_options, - other_args_to_resolve=other_args_to_resolve, - ) - if "deployment_schema" in self._bound_other_args_to_resolve: - deployment_schema: DeploymentSchema = self._bound_other_args_to_resolve[ - "deployment_schema" - ] - deployment_shell = schema_to_deployment(deployment_schema) - - self._deployment = deployment_shell.options( - func_or_class=func_body, - name=self._deployment_name, - _init_args=(), - _init_kwargs={}, - _internal=True, - ) - else: - replica_config = ReplicaConfig.create( - deployment_def=func_body, - init_args=tuple(), - init_kwargs=dict(), - ray_actor_options=func_options, - ) - self._deployment: Deployment = Deployment( - deployment_name, - deployment_config=DeploymentConfig(), - replica_config=replica_config, - _internal=True, - ) - - self._deployment_handle = DeploymentHandle( - self._deployment.name, self._app_name - ) - - def _copy_impl( - self, - new_args: List[Any], - new_kwargs: Dict[str, Any], - new_options: Dict[str, Any], - new_other_args_to_resolve: Dict[str, Any], - ): - return DeploymentFunctionNode( - self._body, - self._deployment_name, - self._app_name, - new_args, - new_kwargs, - new_options, - other_args_to_resolve=new_other_args_to_resolve, - ) - - def __str__(self) -> str: - return get_dag_node_str(self, str(self._body)) - - def get_deployment_name(self): - return self._deployment_name diff --git a/python/ray/serve/_private/deployment_graph_build.py b/python/ray/serve/_private/deployment_graph_build.py deleted file mode 100644 index 56ec3fe4ce651..0000000000000 --- a/python/ray/serve/_private/deployment_graph_build.py +++ /dev/null @@ -1,217 +0,0 @@ -import inspect -from collections import OrderedDict -from typing import List - -from ray.dag import ClassNode, DAGNode -from ray.dag.function_node import FunctionNode -from ray.dag.utils import _DAGNodeNameGenerator -from ray.experimental.gradio_utils import type_to_string -from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME -from ray.serve._private.deployment_function_node import DeploymentFunctionNode -from ray.serve._private.deployment_node import DeploymentNode -from ray.serve.deployment import Deployment, schema_to_deployment -from ray.serve.handle import DeploymentHandle -from ray.serve.schema import DeploymentSchema - - -def build( - ray_dag_root_node: DAGNode, name: str = SERVE_DEFAULT_APP_NAME -) -> List[Deployment]: - """Do all the DAG transformation, extraction and generation needed to - produce a runnable and deployable serve pipeline application from a valid - DAG authored with Ray DAG API. - - This should be the only user facing API that user interacts with. - - Assumptions: - Following enforcements are only applied at generating and applying - pipeline artifact, but not blockers for local development and testing. - - - ALL args and kwargs used in DAG building should be JSON serializable. - This means in order to ensure your pipeline application can run on - a remote cluster potentially with different runtime environment, - among all options listed: - - 1) binding in-memory objects - 2) Rely on pickling - 3) Enforce JSON serialibility on all args used - - We believe both 1) & 2) rely on unstable in-memory objects or - cross version pickling / closure capture, where JSON serialization - provides the right contract needed for proper deployment. - - - ALL classes and methods used should be visible on top of the file and - importable via a fully qualified name. Thus no inline class or - function definitions should be used. - - Args: - ray_dag_root_node: DAGNode acting as root of a Ray authored DAG. It - should be executable via `ray_dag_root_node.execute(user_input)` - and should have `InputNode` in it. - name: Application name,. If provided, formatting all the deployment name to - {name}_{deployment_name}, if not provided, the deployment name won't be - updated. - - Returns: - deployments: All deployments needed for an e2e runnable serve pipeline, - accessible via python .remote() call. - - Examples: - - .. code-block:: python - - with InputNode() as dag_input: - m1 = Model.bind(1) - m2 = Model.bind(2) - m1_output = m1.forward.bind(dag_input[0]) - m2_output = m2.forward.bind(dag_input[1]) - ray_dag = ensemble.bind(m1_output, m2_output) - - Assuming we have non-JSON serializable or inline defined class or - function in local pipeline development. - - .. code-block:: python - - from ray.serve.api import build as build_app - deployments = build_app(ray_dag) # it can be method node - deployments = build_app(m1) # or just a regular node. - """ - with _DAGNodeNameGenerator() as node_name_generator: - serve_root_dag = ray_dag_root_node.apply_recursive( - lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator, name) - ) - deployments = extract_deployments_from_serve_dag(serve_root_dag) - - # If the ingress deployment is a function and it is bound to other deployments, - # reject. - if isinstance(serve_root_dag, DeploymentFunctionNode) and len(deployments) != 1: - raise ValueError( - "The ingress deployment to your application cannot be a function if there " - "are multiple deployments. If you want to compose them, use a class. If " - "you're using the DAG API, the function should be bound to a DAGDriver." - ) - - # The last deployment in the list is the ingress. - return deployments - - -def transform_ray_dag_to_serve_dag( - dag_node: DAGNode, node_name_generator: _DAGNodeNameGenerator, app_name: str -): - """ - Transform a Ray DAG to a Serve DAG. Map ClassNode to DeploymentNode with - ray decorated body passed in. - """ - if isinstance(dag_node, ClassNode): - deployment_name = node_name_generator.get_node_name(dag_node) - ( - replaced_deployment_init_args, - replaced_deployment_init_kwargs, - ) = dag_node.apply_functional( - [dag_node.get_args(), dag_node.get_kwargs()], - predictate_fn=lambda node: isinstance( - node, - # We need to match and replace all DAGNodes even though they - # could be None, because no DAGNode replacement should run into - # re-resolved child DAGNodes, otherwise with KeyError - ( - DeploymentNode, - DeploymentFunctionNode, - ), - ), - apply_fn=lambda node: DeploymentHandle(node._deployment.name, app_name), - ) - - # ClassNode is created via bind on serve.deployment decorated class - # with no serve specific configs. - deployment_schema: DeploymentSchema = dag_node._bound_other_args_to_resolve[ - "deployment_schema" - ] - - deployment_shell: Deployment = schema_to_deployment(deployment_schema) - - # Prefer user specified name to override the generated one. - if ( - inspect.isclass(dag_node._body) - and deployment_shell.name != dag_node._body.__name__ - ): - deployment_name = deployment_shell.name - - deployment = deployment_shell.options( - func_or_class=dag_node._body, - name=deployment_name, - _init_args=replaced_deployment_init_args, - _init_kwargs=replaced_deployment_init_kwargs, - _internal=True, - ) - - return DeploymentNode( - deployment, - app_name, - dag_node.get_args(), - dag_node.get_kwargs(), - dag_node.get_options(), - other_args_to_resolve=dag_node.get_other_args_to_resolve(), - ) - elif isinstance( - dag_node, - FunctionNode - # TODO (jiaodong): We do not convert ray function to deployment function - # yet, revisit this later - ) and dag_node.get_other_args_to_resolve().get("is_from_serve_deployment"): - deployment_name = node_name_generator.get_node_name(dag_node) - - other_args_to_resolve = dag_node.get_other_args_to_resolve() - if "return" in dag_node._body.__annotations__: - other_args_to_resolve["result_type_string"] = type_to_string( - dag_node._body.__annotations__["return"] - ) - - # Set the deployment name if the user provides. - if "deployment_schema" in dag_node._bound_other_args_to_resolve: - schema = dag_node._bound_other_args_to_resolve["deployment_schema"] - if ( - inspect.isfunction(dag_node._body) - and schema.name != dag_node._body.__name__ - ): - deployment_name = schema.name - - return DeploymentFunctionNode( - dag_node._body, - deployment_name, - app_name, - dag_node.get_args(), - dag_node.get_kwargs(), - dag_node.get_options(), - other_args_to_resolve=other_args_to_resolve, - ) - else: - # TODO: (jiaodong) Support FunctionNode or leave it as ray task - return dag_node - - -def extract_deployments_from_serve_dag( - serve_dag_root: DAGNode, -) -> List[Deployment]: - """Extract deployment python objects from a transformed serve DAG. Should - only be called after `transform_ray_dag_to_serve_dag`, otherwise nothing - to return. - - Args: - serve_dag_root: Transformed serve dag root node. - Returns: - deployments (List[Deployment]): List of deployment python objects - fetched from serve dag. - """ - deployments = OrderedDict() - - def extractor(dag_node): - if isinstance(dag_node, (DeploymentNode, DeploymentFunctionNode)): - deployment = dag_node._deployment - # In case same deployment is used in multiple DAGNodes - deployments[deployment.name] = deployment - return dag_node - - serve_dag_root.apply_recursive(extractor) - - return list(deployments.values()) diff --git a/python/ray/serve/_private/deployment_scheduler.py b/python/ray/serve/_private/deployment_scheduler.py index 221e5aa9b1c6a..43a356ec36f56 100644 --- a/python/ray/serve/_private/deployment_scheduler.py +++ b/python/ray/serve/_private/deployment_scheduler.py @@ -115,6 +115,7 @@ class ReplicaSchedulingRequestStatus(str, Enum): IN_PROGRESS = "IN_PROGRESS" SUCCEEDED = "SUCCEEDED" + ACTOR_CREATION_FAILED = "ACTOR_CREATION_FAILED" PLACEMENT_GROUP_CREATION_FAILED = "PLACEMENT_GROUP_CREATION_FAILED" @@ -560,8 +561,7 @@ def _schedule_replica( # make progress even if the placement group isn't created. # See https://github.com/ray-project/ray/issues/43888. logger.exception( - "Replica scheduling failed. Failed to create a " - f"placement group for {replica_id}." + f"Failed to create a placement group for {replica_id}." ) scheduling_request.status = ( ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED @@ -595,10 +595,19 @@ def _schedule_replica( f"{deployment_id.app_name}:{deployment_id.name}" ] = (1.0 / scheduling_request.max_replicas_per_node) - actor_handle = scheduling_request.actor_def.options( - scheduling_strategy=scheduling_strategy, - **actor_options, - ).remote(*scheduling_request.actor_init_args) + try: + actor_handle = scheduling_request.actor_def.options( + scheduling_strategy=scheduling_strategy, + **actor_options, + ).remote(*scheduling_request.actor_init_args) + except Exception: + # We add a defensive exception here, so the controller can + # make progress even if the actor options are misconfigured. + logger.exception(f"Failed to create an actor for {replica_id}.") + scheduling_request.status = ( + ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED + ) + return del self._pending_replicas[deployment_id][replica_id] self._on_replica_launching( diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 69ac41d68102e..ca7d52097c2c9 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -2751,6 +2751,16 @@ def _handle_scheduling_request_failures( f"group for replica {scheduling_request.replica_id}. " "See Serve controller logs for more details." ) + elif ( + scheduling_request.status + == ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED + ): + failed_replicas.append(scheduling_request.replica_id) + self._deployment_states[deployment_id].record_replica_startup_failure( + "Replica scheduling failed. Failed to create an actor " + f"for replica {scheduling_request.replica_id}. " + "See Serve controller logs for more details." + ) if failed_replicas: self._deployment_states[deployment_id].stop_replicas(failed_replicas) self._deployment_states[deployment_id].update_replica_startup_backoff_time() diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 512c328f134de..3a46722039fb4 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -10,14 +10,13 @@ import ray from ray import cloudpickle from ray._private.serialization import pickle_dumps -from ray.dag import DAGNode +from ray.serve._private.build_app import build_app from ray.serve._private.config import ( DeploymentConfig, ReplicaConfig, handle_num_replicas_auto, ) from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_LOGGER_NAME -from ray.serve._private.deployment_graph_build import build as pipeline_build from ray.serve._private.http_util import ( ASGIAppReplicaWrapper, make_fastapi_class_based_view, @@ -28,7 +27,6 @@ Default, ensure_serialization_context, extract_self_if_method_call, - get_random_string, validate_route_prefix, ) from ray.serve.config import ( @@ -448,46 +446,17 @@ def _run( # Record after Ray has been started. ServeUsageTag.API_VERSION.record("v2") - if isinstance(target, Application): - deployments = pipeline_build(target._get_internal_dag_node(), name) - ingress_deployment_name = deployments[-1].name - else: - msg = "`serve.run` expects an `Application` returned by `Deployment.bind()`." - if isinstance(target, DAGNode): - msg += ( - " If you are using the DAG API, you must bind the DAG node to a " - "deployment like: `app = Deployment.bind(my_dag_output)`. " - ) - raise TypeError(msg) - - parameter_group = [] - for deployment in deployments: - is_ingress = deployment._name == ingress_deployment_name - if deployment.logging_config is None and logging_config: - deployment = deployment.options(logging_config=logging_config) - - deployment_parameters = { - "name": deployment._name, - "replica_config": deployment._replica_config, - "deployment_config": deployment._deployment_config, - "version": deployment._version or get_random_string(), - "route_prefix": route_prefix if is_ingress else None, - "docs_path": deployment._docs_path, - "ingress": is_ingress, - } - parameter_group.append(deployment_parameters) - - client.deploy_application( - name, - parameter_group, - _blocking=_blocking, - ) + if not isinstance(target, Application): + raise TypeError( + "`serve.run` expects an `Application` returned by `Deployment.bind()`." + ) - # The deployment state is not guaranteed to be created after - # deploy_application returns; the application state manager will - # need another reconcile iteration to create it. - client._wait_for_deployment_created(ingress_deployment_name, name) - return client.get_handle(ingress_deployment_name, name, check_exists=False) + return client.deploy_application( + build_app(target, name=name), + blocking=_blocking, + route_prefix=route_prefix, + logging_config=logging_config, + ) @PublicAPI(stability="stable") diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index e621cf88434da..3a03557ad818e 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -3,9 +3,6 @@ from copy import deepcopy from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from ray.dag.class_node import ClassNode -from ray.dag.dag_node import DAGNodeBase -from ray.dag.function_node import FunctionNode from ray.serve._private.config import ( DeploymentConfig, ReplicaConfig, @@ -22,7 +19,7 @@ @PublicAPI(stability="stable") -class Application(DAGNodeBase): +class Application: """One or more deployments bound with arguments that can be deployed together. Can be passed into another `Deployment.bind()` to compose multiple deployments in a @@ -58,28 +55,9 @@ class MyDeployment: """ - def __init__( - self, *, _internal_dag_node: Optional[Union[ClassNode, FunctionNode]] = None - ): - if _internal_dag_node is None: - raise RuntimeError("This class should not be constructed directly.") - - self._internal_dag_node = _internal_dag_node - - def _get_internal_dag_node(self) -> Union[ClassNode, FunctionNode]: - if self._internal_dag_node is None: - raise RuntimeError("Application object should not be constructed directly.") - - return self._internal_dag_node - - @classmethod - def _from_internal_dag_node(cls, dag_node: Union[ClassNode, FunctionNode]): - return cls(_internal_dag_node=dag_node) - - # Proxy all method calls to the underlying DAG node. This allows this class to be - # passed in place of the ClassNode or FunctionNode in the DAG building code. - def __getattr__(self, name: str) -> Any: - return getattr(self._get_internal_dag_node(), name) + def __init__(self, bound_deployment: "Deployment"): + # This is used by `build_app`, but made private so users don't use it. + self._bound_deployment = bound_deployment @PublicAPI(stability="stable") @@ -223,32 +201,7 @@ def bind(self, *args, **kwargs) -> Application: The returned Application can be deployed using `serve.run` (or via config file) or bound to another deployment for composition. """ - - schema_shell = deployment_to_schema(self) - if inspect.isfunction(self.func_or_class): - dag_node = FunctionNode( - self.func_or_class, - args, # Used to bind and resolve DAG only, can take user input - kwargs, # Used to bind and resolve DAG only, can take user input - self._replica_config.ray_actor_options or dict(), - other_args_to_resolve={ - "deployment_schema": schema_shell, - "is_from_serve_deployment": True, - }, - ) - else: - dag_node = ClassNode( - self.func_or_class, - args, - kwargs, - cls_options=self._replica_config.ray_actor_options or dict(), - other_args_to_resolve={ - "deployment_schema": schema_shell, - "is_from_serve_deployment": True, - }, - ) - - return Application._from_internal_dag_node(dag_node) + return Application(self.options(_init_args=args, _init_kwargs=kwargs)) def options( self, @@ -442,9 +395,7 @@ def __repr__(self): return str(self) -def deployment_to_schema( - d: Deployment, -) -> DeploymentSchema: +def deployment_to_schema(d: Deployment) -> DeploymentSchema: """Converts a live deployment object to a corresponding structured schema. Args: diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 270dc4a032b4c..098e0294fa564 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -19,6 +19,7 @@ from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args from ray.dashboard.modules.serve.sdk import ServeSubmissionClient from ray.serve._private import api as _private_api +from ray.serve._private.build_app import BuiltApplication, build_app from ray.serve._private.constants import ( DEFAULT_GRPC_PORT, DEFAULT_HTTP_HOST, @@ -26,7 +27,6 @@ SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE, ) -from ray.serve._private.deployment_graph_build import build as pipeline_build from ray.serve.config import DeploymentMode, ProxyLocation, gRPCOptions from ray.serve.deployment import Application, deployment_to_schema from ray.serve.schema import ( @@ -492,7 +492,7 @@ def run( is_config = False import_path = config_or_import_path cli_logger.print(f"Running import path: '{import_path}'.") - app = _private_api.call_app_builder_with_args_if_necessary( + app = _private_api.call_user_app_builder_with_args_if_necessary( import_attr(import_path), args_dict ) @@ -561,7 +561,7 @@ def run( try: # The module needs to be reloaded with `importlib` in order to # pick up any changes. - app = _private_api.call_app_builder_with_args_if_necessary( + app = _private_api.call_user_app_builder_with_args_if_necessary( import_attr(import_path, reload_module=True), args_dict ) serve.run( @@ -794,13 +794,13 @@ def build_app_config(import_path: str, name: str = None): f"Expected '{import_path}' to be an Application but got {type(app)}." ) - deployments = pipeline_build(app, name) + built_app: BuiltApplication = build_app(app, name=name) schema = ServeApplicationSchema( name=name, route_prefix="/" if len(import_paths) == 1 else f"/{name}", import_path=import_path, runtime_env={}, - deployments=[deployment_to_schema(d) for d in deployments], + deployments=[deployment_to_schema(d) for d in built_app.deployments], ) return schema.dict(exclude_unset=True) diff --git a/python/ray/serve/tests/BUILD b/python/ray/serve/tests/BUILD index cf0107f757a6e..1b047da86a5dd 100644 --- a/python/ray/serve/tests/BUILD +++ b/python/ray/serve/tests/BUILD @@ -23,7 +23,6 @@ py_test_module_list( "test_cluster_node_info_cache.py", "test_constructor_failure.py", "test_controller.py", - "test_deployment_graph_handle_serde.py", "test_deployment_version.py", "test_expected_versions.py", "test_max_queued_requests.py", diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 36b87f8955f3e..ca99b9b7ee2d7 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -12,7 +12,7 @@ from ray import serve from ray._private.pydantic_compat import BaseModel, ValidationError from ray._private.test_utils import SignalActor, wait_for_condition -from ray.serve._private.api import call_app_builder_with_args_if_necessary +from ray.serve._private.api import call_user_app_builder_with_args_if_necessary from ray.serve._private.common import DeploymentID from ray.serve._private.constants import ( DEFAULT_MAX_ONGOING_REQUESTS, @@ -566,16 +566,16 @@ class TypedArgs(BaseModel): def test_prebuilt_app(self): a = self.A.bind() - assert call_app_builder_with_args_if_necessary(a, {}) == a + assert call_user_app_builder_with_args_if_necessary(a, {}) == a f = self.f.bind() - assert call_app_builder_with_args_if_necessary(f, {}) == f + assert call_user_app_builder_with_args_if_necessary(f, {}) == f with pytest.raises( ValueError, match="Arguments can only be passed to an application builder function", ): - call_app_builder_with_args_if_necessary(f, {"key": "val"}) + call_user_app_builder_with_args_if_necessary(f, {"key": "val"}) def test_invalid_builder(self): class ThisShouldBeAFunction: @@ -588,7 +588,7 @@ class ThisShouldBeAFunction: "or an application builder function" ), ): - call_app_builder_with_args_if_necessary(ThisShouldBeAFunction, {}) + call_user_app_builder_with_args_if_necessary(ThisShouldBeAFunction, {}) def test_invalid_signature(self): def builder_with_two_args(args1, args2): @@ -598,7 +598,7 @@ def builder_with_two_args(args1, args2): TypeError, match="Application builder functions should take exactly one parameter", ): - call_app_builder_with_args_if_necessary(builder_with_two_args, {}) + call_user_app_builder_with_args_if_necessary(builder_with_two_args, {}) def test_builder_returns_bad_type(self): def return_none(args): @@ -608,7 +608,7 @@ def return_none(args): TypeError, match="Application builder functions must return a", ): - call_app_builder_with_args_if_necessary(return_none, {}) + call_user_app_builder_with_args_if_necessary(return_none, {}) def return_unbound_deployment(args): return self.f @@ -617,21 +617,22 @@ def return_unbound_deployment(args): TypeError, match="Application builder functions must return a", ): - call_app_builder_with_args_if_necessary(return_unbound_deployment, {}) + call_user_app_builder_with_args_if_necessary(return_unbound_deployment, {}) def test_basic_no_args(self): def build_function(args): return self.A.bind() assert isinstance( - call_app_builder_with_args_if_necessary(build_function, {}), Application + call_user_app_builder_with_args_if_necessary(build_function, {}), + Application, ) def build_class(args): return self.f.bind() assert isinstance( - call_app_builder_with_args_if_necessary(build_class, {}), Application + call_user_app_builder_with_args_if_necessary(build_class, {}), Application ) def test_args_dict(self): @@ -645,7 +646,7 @@ def build(args): args["message"] ) - app = call_app_builder_with_args_if_necessary(build, args_dict) + app = call_user_app_builder_with_args_if_necessary(build, args_dict) assert isinstance(app, Application) def test_args_typed(self): @@ -658,7 +659,7 @@ def build(args): args["message"] ) - app = call_app_builder_with_args_if_necessary(build, args_dict) + app = call_user_app_builder_with_args_if_necessary(build, args_dict) assert isinstance(app, Application) def build(args: Dict[str, str]): @@ -668,7 +669,7 @@ def build(args: Dict[str, str]): args["message"] ) - app = call_app_builder_with_args_if_necessary(build, args_dict) + app = call_user_app_builder_with_args_if_necessary(build, args_dict) assert isinstance(app, Application) class ForwardRef: @@ -679,7 +680,7 @@ def build(args: "ForwardRef"): args["message"] ) - app = call_app_builder_with_args_if_necessary(ForwardRef.build, args_dict) + app = call_user_app_builder_with_args_if_necessary(ForwardRef.build, args_dict) assert isinstance(app, Application) def build(args: self.TypedArgs): @@ -690,7 +691,7 @@ def build(args: self.TypedArgs): assert args.num_replicas == 3 return self.A.options(num_replicas=args.num_replicas).bind(args.message) - app = call_app_builder_with_args_if_necessary(build, args_dict) + app = call_user_app_builder_with_args_if_necessary(build, args_dict) assert isinstance(app, Application) # Sanity check that pydantic validation works. @@ -701,7 +702,7 @@ def check_missing_optional(args: self.TypedArgs): assert args.num_replicas is None return self.A.bind() - app = call_app_builder_with_args_if_necessary( + app = call_user_app_builder_with_args_if_necessary( check_missing_optional, {"message": "hiya"} ) assert isinstance(app, Application) @@ -711,7 +712,7 @@ def check_missing_required(args: self.TypedArgs): assert False, "Shouldn't get here because validation failed." with pytest.raises(ValidationError, match="field required"): - call_app_builder_with_args_if_necessary( + call_user_app_builder_with_args_if_necessary( check_missing_required, {"num_replicas": "10"} ) @@ -742,7 +743,7 @@ def build(args: Cat): assert args.age == cat_dict["age"] return self.A.bind(f"My {args.color} cat is {args.age} years old.") - app = call_app_builder_with_args_if_necessary(build, cat_dict) + app = call_user_app_builder_with_args_if_necessary(build, cat_dict) assert isinstance(app, Application) diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 9b8a92551fbef..769bff110093d 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -92,7 +92,7 @@ def reconfigure(self, config): def __call__(self, *args): return self.config - with pytest.raises(ValidationError): + with pytest.raises(RuntimeError): serve.run(A.options(user_config="hi").bind()) diff --git a/python/ray/serve/tests/test_deploy_2.py b/python/ray/serve/tests/test_deploy_2.py index 9816e4ca6badf..5cab47ab9e62b 100644 --- a/python/ray/serve/tests/test_deploy_2.py +++ b/python/ray/serve/tests/test_deploy_2.py @@ -11,7 +11,6 @@ import ray from ray import serve -from ray._private.pydantic_compat import ValidationError from ray._private.test_utils import SignalActor, wait_for_condition from ray.serve._private.common import DeploymentStatus from ray.serve._private.logging_utils import get_serve_logs_dir @@ -36,9 +35,7 @@ def test_deployment_error_handling(serve_instance): def f(): pass - with pytest.raises( - ValidationError, match="1 validation error for RayActorOptionsSchema.*" - ): + with pytest.raises(RuntimeError): # This is an invalid configuration since dynamic upload of working # directories is not supported. The error this causes in the controller # code should be caught and reported back to the `deploy` caller. diff --git a/python/ray/serve/tests/test_deployment_graph_handle_serde.py b/python/ray/serve/tests/test_deployment_graph_handle_serde.py deleted file mode 100644 index c9e3e266e0386..0000000000000 --- a/python/ray/serve/tests/test_deployment_graph_handle_serde.py +++ /dev/null @@ -1,59 +0,0 @@ -import sys - -import pytest - -import ray -from ray import serve -from ray.serve._private.deployment_graph_build import build as pipeline_build -from ray.serve.dag import InputNode - - -@serve.deployment -def func(): - pass - - -@serve.deployment -class Driver: - def __init__(self, *args): - pass - - def __call__(self, *args): - pass - - -def test_environment_start(): - """Make sure that in the beginning ray hasn't been started""" - assert not ray.is_initialized() - - -def test_func_building(): - dag = func.bind() - assert len(pipeline_build(dag)) == 1 - - -def test_class_building(): - dag = Driver.bind() - assert len(pipeline_build(dag)) == 1 - - -def test_dag_building(): - dag = Driver.bind(func.bind()) - assert len(pipeline_build(dag)) == 2 - - -def test_nested_building(): - with InputNode() as inp: - out = func.bind(inp) - out = Driver.bind(func.bind()) - dag = Driver.bind(out) - assert len(pipeline_build(dag)) == 3 - - -def test_environment_end(): - """Make sure that in the end ray hasn't been started""" - assert not ray.is_initialized() - - -if __name__ == "__main__": - sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_model_composition.py b/python/ray/serve/tests/test_model_composition.py index 4a276599e60ef..f30951ee967e1 100644 --- a/python/ray/serve/tests/test_model_composition.py +++ b/python/ray/serve/tests/test_model_composition.py @@ -9,7 +9,6 @@ import starlette.requests from ray import serve -from ray.serve._private.deployment_graph_build import build as pipeline_build from ray.serve.handle import DeploymentHandle NESTED_HANDLE_KEY = "nested_handle" @@ -183,20 +182,6 @@ def test_single_node_deploy_success(serve_instance): assert handle.remote(41).result() == 42 -def test_options_and_names(serve_instance): - m1 = Adder.bind(1) - m1_built = pipeline_build(m1)[-1] - assert m1_built.name == "Adder" - - m1 = Adder.options(name="Adder2").bind(1) - m1_built = pipeline_build(m1)[-1] - assert m1_built.name == "Adder2" - - m1 = Adder.options(num_replicas=2).bind(1) - m1_built = pipeline_build(m1)[-1] - assert m1_built.num_replicas == 2 - - @serve.deployment class TakeHandle: def __init__(self, handle) -> None: @@ -312,40 +297,24 @@ def test_single_functional_node_base_case(serve_instance): assert requests.get("http://127.0.0.1:8000/").text == "1" -def test_unsupported_bind(): +def test_unsupported_remote(): @serve.deployment class Actor: def ping(self): return "hello" - with pytest.raises(AttributeError, match=r"\.bind\(\) cannot be used again on"): - _ = Actor.bind().bind() - - with pytest.raises(AttributeError, match=r"\.bind\(\) cannot be used again on"): - _ = Actor.bind().ping.bind().bind() - with pytest.raises( - AttributeError, - match=r"\.remote\(\) cannot be used on ClassMethodNodes", + AttributeError, match=r"\'Application\' object has no attribute \'remote\'" ): - actor = Actor.bind() - _ = actor.ping.remote() - - -def test_unsupported_remote(): - @serve.deployment - class Actor: - def ping(self): - return "hello" - - with pytest.raises(AttributeError, match=r"\'Actor\' has no attribute \'remote\'"): _ = Actor.bind().remote() @serve.deployment def func(): return 1 - with pytest.raises(AttributeError, match=r"\.remote\(\) cannot be used on"): + with pytest.raises( + AttributeError, match=r"\'Application\' object has no attribute \'remote\'" + ): _ = func.bind().remote() diff --git a/python/ray/serve/tests/unit/test_build_app.py b/python/ray/serve/tests/unit/test_build_app.py new file mode 100644 index 0000000000000..8dd241fa8f98a --- /dev/null +++ b/python/ray/serve/tests/unit/test_build_app.py @@ -0,0 +1,281 @@ +import sys +from typing import List +from unittest import mock + +import pytest + +from ray import serve +from ray.serve._private.build_app import BuiltApplication, build_app +from ray.serve.deployment import Application, Deployment +from ray.serve.handle import DeploymentHandle + + +@pytest.fixture(autouse=True) +def patch_handle_eq(): + """Patch DeploymentHandle.__eq__ to compare options we care about.""" + + def _patched_handle_eq(self, other): + return all( + [ + isinstance(other, type(self)), + self.deployment_id == other.deployment_id, + self.handle_options == other.handle_options, + ] + ) + + with mock.patch( + "ray.serve.handle._DeploymentHandleBase.__eq__", _patched_handle_eq + ): + yield + + +def _build_and_check( + app: Application, + *, + expected_ingress_name: str, + expected_deployments: List[Deployment], + app_name: str = "default", +): + built_app: BuiltApplication = build_app(app, name=app_name) + assert built_app.name == app_name + assert built_app.ingress_deployment_name == expected_ingress_name + assert len(built_app.deployments) == len(expected_deployments) + + for expected_deployment in expected_deployments: + generated_deployment = None + for d in built_app.deployments: + if d.name == expected_deployment.name: + generated_deployment = d + + assert generated_deployment is not None, ( + f"Expected a deployment with name '{expected_deployment.name}' " + "to be generated but none was found. All generated names: " + + str([d.name for d in built_app.deployments]) + ) + + assert expected_deployment == generated_deployment + + +def test_single_deployment_basic(): + @serve.deployment( + num_replicas=123, + max_ongoing_requests=10, + max_queued_requests=10, + ) + class D: + pass + + app = D.bind("hello world!", hello="world") + _build_and_check( + app, + expected_ingress_name="D", + expected_deployments=[ + D.options( + name="D", _init_args=("hello world!",), _init_kwargs={"hello": "world"} + ) + ], + ) + + +def test_single_deployment_custom_name(): + @serve.deployment( + num_replicas=123, + max_ongoing_requests=10, + max_queued_requests=10, + ) + class D: + pass + + app = D.options(name="foobar123").bind("hello world!", hello="world") + _build_and_check( + app, + expected_ingress_name="foobar123", + expected_deployments=[ + D.options( + name="foobar123", + _init_args=("hello world!",), + _init_kwargs={"hello": "world"}, + ) + ], + ) + + +def test_multi_deployment_basic(): + @serve.deployment(num_replicas=3) + class Inner: + pass + + @serve.deployment(num_replicas=1) + class Outer: + pass + + app = Outer.bind(Inner.bind(), other=Inner.options(name="Other").bind()) + _build_and_check( + app, + expected_ingress_name="Outer", + expected_deployments=[ + Inner.options(name="Inner", _init_args=tuple(), _init_kwargs={}), + Inner.options(name="Other", _init_args=tuple(), _init_kwargs={}), + Outer.options( + name="Outer", + _init_args=( + DeploymentHandle( + "Inner", + app_name="default", + ), + ), + _init_kwargs={ + "other": DeploymentHandle( + "Other", + app_name="default", + ), + }, + ), + ], + ) + + +def test_multi_deployment_handle_in_nested_obj(): + @serve.deployment(num_replicas=3) + class Inner: + pass + + @serve.deployment(num_replicas=1) + class Outer: + pass + + app = Outer.bind([Inner.bind()]) + _build_and_check( + app, + expected_ingress_name="Outer", + expected_deployments=[ + Inner.options(name="Inner", _init_args=tuple(), _init_kwargs={}), + Outer.options( + name="Outer", + _init_args=( + [ + DeploymentHandle( + "Inner", + app_name="default", + ), + ], + ), + _init_kwargs={}, + ), + ], + ) + + +def test_multi_deployment_custom_app_name(): + @serve.deployment(num_replicas=3) + class Inner: + pass + + @serve.deployment(num_replicas=1) + class Outer: + pass + + app = Outer.bind(Inner.bind()) + _build_and_check( + app, + app_name="custom", + expected_ingress_name="Outer", + expected_deployments=[ + Inner.options(name="Inner", _init_args=tuple(), _init_kwargs={}), + Outer.options( + name="Outer", + _init_args=( + DeploymentHandle( + "Inner", + app_name="custom", + ), + ), + _init_kwargs={}, + ), + ], + ) + + +def test_multi_deployment_name_collision(): + @serve.deployment + class Inner: + pass + + @serve.deployment + class Outer: + pass + + app = Outer.bind( + Inner.bind("arg1"), + Inner.bind("arg2"), + ) + _build_and_check( + app, + expected_ingress_name="Outer", + expected_deployments=[ + Inner.options(name="Inner", _init_args=("arg1",), _init_kwargs={}), + Inner.options(name="Inner_1", _init_args=("arg2",), _init_kwargs={}), + Outer.options( + name="Outer", + _init_args=( + DeploymentHandle( + "Inner", + app_name="default", + ), + DeploymentHandle( + "Inner_1", + app_name="default", + ), + ), + _init_kwargs={}, + ), + ], + ) + + +def test_multi_deployment_same_app_passed_twice(): + @serve.deployment + class Shared: + pass + + @serve.deployment(num_replicas=3) + class Inner: + pass + + @serve.deployment(num_replicas=1) + class Outer: + pass + + shared = Shared.bind() + app = Outer.bind(Inner.bind(shared), shared) + shared_handle = DeploymentHandle( + "Shared", + app_name="default", + ) + _build_and_check( + app, + expected_ingress_name="Outer", + expected_deployments=[ + Shared.options( + name="Shared", + _init_args=tuple(), + _init_kwargs={}, + ), + Inner.options(name="Inner", _init_args=(shared_handle,), _init_kwargs={}), + Outer.options( + name="Outer", + _init_args=( + DeploymentHandle( + "Inner", + app_name="default", + ), + shared_handle, + ), + _init_kwargs={}, + ), + ], + ) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/unit/test_deployment_class.py b/python/ray/serve/tests/unit/test_deployment_class.py index 75743f6184acd..918ba8c49a3b6 100644 --- a/python/ray/serve/tests/unit/test_deployment_class.py +++ b/python/ray/serve/tests/unit/test_deployment_class.py @@ -7,7 +7,6 @@ from ray import serve from ray.serve._private.config import DeploymentConfig -from ray.serve.deployment import deployment_to_schema, schema_to_deployment def get_random_dict_combos(d: Dict, n: int) -> List[Dict]: @@ -109,47 +108,6 @@ def f(): assert f._deployment_config.user_configured_option_names == set(options.keys()) - @pytest.mark.parametrize("options", deployment_option_combos) - def test_user_configured_option_names_schematized(self, options: Dict): - """Check user_configured_option_names after schematization. - - Args: - options: Maps deployment option strings (e.g. "name", - "num_replicas", etc.) to sample inputs. Pairs come from - TestDeploymentOptions.deployment_options. - """ - - # Some options won't be considered user-configured after schematization - # since the schema doesn't track them. - untracked_options = ["name", "version", "init_args", "init_kwargs"] - - for option in untracked_options: - if option in options: - del options[option] - - @serve.deployment(**options) - def f(): - pass - - schematized_deployment = deployment_to_schema(f) - deschematized_deployment = schema_to_deployment(schematized_deployment) - - # Don't track name in the deschematized deployment since it's optional - # in deployment decorator but required in schema, leading to - # inconsistent behavior. - if ( - "name" - in deschematized_deployment._deployment_config.user_configured_option_names - ): - deschematized_deployment._deployment_config.user_configured_option_names.remove( # noqa: E501 - "name" - ) - - assert ( - deschematized_deployment._deployment_config.user_configured_option_names - == set(options.keys()) - ) - @pytest.mark.parametrize("options", deployment_option_combos) def test_user_configured_option_names_serialized(self, options: Dict): """Check user_configured_option_names after serialization.