Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Async request support in Ray Serve #32292

Open
edoakes opened this issue Feb 7, 2023 · 18 comments
Open

[RFC] Async request support in Ray Serve #32292

edoakes opened this issue Feb 7, 2023 · 18 comments
Labels
enhancement Request for new feature and/or capability ray-team-created Ray Team created RFC RFC issues serve Ray Serve Related Issue triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@edoakes
Copy link
Collaborator

edoakes commented Feb 7, 2023

TL;DR - Proposal for an API to support launching expensive computations in Serve (e.g., model fine-tuning, long-running inference) using an asynchronous request API

Problem statement

With the rise of generative models, the Serve team has seen growing interest in supporting "expensive computations" in Serve. For example, users have asked to launch Stable Diffusion fine-tuning jobs and long-running inference tasks that run not for seconds, but for several minutes to an hour. These tasks are often too long to run as a stateless inference request, but too short to justify launching an entirely new Ray job / cluster.

As workarounds, users are often connecting other queueing systems to Ray, such as Celery. The purpose of this RFC is to gather feedback on APIs for handling such workloads natively in Serve without needing a full-blown queueing system.

Previous proposals

A previous RFC proposed using Ray Workflows as a wholesale replacement for queueing systems. This solution works, but is heavyweight and relies on Workflows, a relatively new library: #21161

Below are two alternate proposals with the aim to provide a simpler API.

Proposal 1 -- add async requests API to Serve

Add an "async_request" decorator to Serve deployments. For async decorated methods, Serve will generate queueing preamble/postamble logic and APIs to enable listing, resuming, and checking on the status of async requests. The API would look something as follows:

@serve.deployment
class FineTuningApp:
   @serve.async_request(
          max_queued=1000,
          max_running=10,
          priority=2,
          idempotency_key="request_id")
   def fine_tune_request(self, request_id: str, num_epochs: int, dataset: str) -> str:
          # User-implemented checkpoint and recovery.
          if checkpoint_exists(request_id):
               model, epoch = restore_from_checkpoint(request_id)
          else:
               model, epoch = new_model(), 0
          for i in range(epoch, num_epochs):
                train_one_epoch(model, dataset)
                save_checkpoint(model, i, request_id)
          return model

Here are examples of generated API methods for managing async requests:

POST /app/fine_tune_request?request_id=12345&dataset=foo1&num_epochs=5
POST /app/fine_tune_request/cancel?request_id=12345
GET /app/fine_tune_request/status?request_id=12345
GET /app/fine_tune_request/result?request_id=12345
GET /app/fine_tune_request/list?filter_status=RUNNING&limit=1000

Fault tolerance: Serve would persist the queue of requests in its coordinator actor / persistent storage. When resuming from cluster failure, Serve can load and resume previous running async requests, which can resume from any checkpoints they have taken.

Pros:

  • Simple extension of existing Serve handlers, which means requests can take advantage of existing Serve scheduling, autoscaling, observability, etc. logic

Cons:

  • May not interact well with other Serve APIs like DeploymentGraphs

Proposal 2 -- create a simplified TaskQueue API backed by Ray Workflows

Instead of extending Serve's API, create a separate TaskQueue API that users can use to manually create a Serve handler implementing management methods. For example, the above example could be instead implemented as:

# Define the processing function separately from the handler.
def fine_tune_request(request_id: str, num_epochs: int, dataset: str) -> str:
          # User-implemented checkpoint and recovery.
          if checkpoint_exists(request_id):
               model, epoch = restore_from_checkpoint(request_id)
          else:
               model, epoch = new_model(), 0
          for i in range(epoch, num_epochs):
                train_one_epoch(model, dataset)
                save_checkpoint(model, i, request_id)
          return model

@serve.deployment
class FineTuningApp:
   def __init__(self):
          TaskQueue.create_if_not_exists("my_queue")
          TaskQueue.resume_all("my_queue")

   def get_or_create_request(self, request_id: str, num_epochs: int, dataset: str):
          TaskQueue.submit_task("my_queue", fine_tune_request, request_id, args=[request_id, num_epochs, dataset])
          return "ok"

   def get_status(self, request_id: str):
         return TaskQueue.get_status("my_queue", request_id)

   def get_result(self, request_id: str):
         return TaskQueue.get_result("my_queue", request_id)

   def cancel_task(self, request_id: str):
         return TaskQueue.cancel("my_queue", request_id)

   def list_tasks(self):
         return TaskQueue.list("my_queue")

Fault tolerance: can be implemented in a similar way as proposal (1).

Pros:

  • Serve API remains unchanged

Cons:

  • More boilerplate / less clean story for users
  • More complex autoscaling and resource allocation story, since both Serve and the TaskQueue library would be requesting resources from the Ray scheduler
  • More new concepts and things to know
@edoakes edoakes added serve Ray Serve Related Issue RFC RFC issues labels Feb 7, 2023
@ericl ericl pinned this issue Feb 7, 2023
@ahmadshadid
Copy link

ahmadshadid commented Feb 9, 2023

Greetings @edoakes, I strongly endorse Proposal 2, to elaborate, Our team is engaged in the development of a quant algo which trains and execute trades in real-time, and Proposal 2 aligns perfectly with our objectives.
Upon implementation, we will be able to init fully-functional trading workflows that focuses on one single asset utilizing the ray-serve framework. whenever the market become liquid we will simply request the initialization of an additional workflow for each new stock we would like to trade ( +1000 stock ), Proposal 2 presents a more streamlined and efficient solution for our use case.

@ericl
Copy link
Contributor

ericl commented Feb 9, 2023

Thanks @WBORSA . Could you elaborate on the advantages / disadvantages for your use case of (2) and (1)?

@ericl ericl unpinned this issue Feb 15, 2023
@richardliaw
Copy link
Contributor

@danielbichuetti saw that you reacted to the above post -- did you have any feedback/thoughts to share?

@Ericxgao
Copy link

Hello, we use Ray job submission queue quite often to spin up quick inference jobs and hack in new features. While quick, being able to persist jobs throughout Ray restarts seems very important (or rerunning failed jobs) which Serve seems to offer.

If it's possible to run separate Python scripts as a Serve job it would be great.

@danielbichuetti
Copy link

@richardliaw Hello!

When @edoakes mentioned Celery being used as Ray lacked its functionality, it was a valid conclusion for our company. It is difficult to compare the code quality between Ray and Celery, and while Celery is not a bad solution, Ray is far superior.

I am personally testing and using Ray Workflows in some scenarios, yet the queue concept is sometimes needed, and it can be quite laborious to establish an effective queue system that integrates well with Ray. Thus, the addition of this feature would be immensely beneficial to us, as we have occasional tasks that take anywhere between 3 and 8 hours.

IMHO, the proposal 1 would be more advantageous to both current and new users, and would reduce allocation issues. The DAG concerns, at least for us, don't apply.

@richardliaw
Copy link
Contributor

As mentioned by @Joshuaalbert, seems like #26642 is highly related as well

@Joshuaalbert
Copy link

Hi! I really support this! A colleague and I were the Summit 2022 and had some meetings with the team to see if they could support our use case, which turns out to be the same use case of the, now popular, generative models like ChatGPT etc.

Our team at Touch has implemented our own solution on top of Serve to solve our use, and have had 3 iterations of improvement of the system. Happy to share what we've tried and learned and strongly support any dev effort to make this a main line feature.

Our use case

Our use case is similar to the "chatbot" use case, where requests are done on behalf of a user and must be stateful. The required latency needs to be low enough that makes passing around state objects intractable overhead. Regarding async vs. sync handling, most of our things can be done synchronously, but not all. Having the option for both types of request handling while also allowing stateful routing of requests would be ideal.

Breaking down each use case

I think Serve should be able to do any combination of these two options.

  1. Statefulness -- Does the use case require a state persistent between successive requests?
  2. Async/Sync Request Handing -- Does the use case require synchronous or asynchronous request handling?

The answers above create a 2x2 matrix:

+-----------------+------------+----------------+
|        X        |  Stateful  |  Stateless     |
+-----------------+------------+----------------+
| Async. requests | Use Case A | Use Case C     |
| Sync. requests  | Use Case B | Standard Serve |
+-----------------+------------+----------------+

Examples of use cases:

  • A: an application might map a user UUID to sequential calls with a conversational language model, where responses stream to a frontend over websocket.
  • B: the same as A, but when the computations are fast enough you don't need the complexity of websockets to handle responses.
  • C: an application might start up a stateless computation that manages some part of your ecosystem in an asynchronous manner

We need A and B at Touch. I think most teams that need stateful Serve will need both A and B. Typically you first develop a concept with stateful synchronous response handling (B), and then only after demonstrating correctness add the complexity of stateful asynchronous handling (A).

We also use C, where we have multiple asynchronous processes that simply perform things on a scheduled basis (similar to how ServeController works) which need access to Ray. We have at least 4 such scheduled operations.

What we've learned

  1. The best approach we've found is to route requests to specific key-mapped actors, and for those actors to be special in that they have the concept of managing exclusive session objects that hold all the state per request series. Key here is to never let the exclusive nature fail, i.e. you never want a state change from one request handling to affect the state of another request handling series.
  2. Take care of session lifetime management from the start. Incorporate the notion of a timeout, after which if there has been no activity you safely terminate the session. We do this with a async "pruning" process that manages the routing table (similar to ServeController).
  3. Expect and handle zombie sessions that persist in routing table.
  4. After an actor has handled some number of request series, let it go to pasture to die peacefully once all the sessions have expired. This practice was initially conceived of by Oracle and is really valuable in maintaining health in production.

@richardliaw
Copy link
Contributor

Miha's comment on Slack:

We had a use case that was implemented similar to Approach 1, using Serve and Core tasks.

Feedback: I prefer approach 1, because it's a much cleaner story and less exotic concept than 2.

Also, assuming requests will route through ServeController, it might be nice to be able to spin down the Serve replicas when the queue is empty, using Serve autoscaler, and spin them back up once the queue gets requests.

@thatcort
Copy link

thatcort commented Feb 23, 2023

A few thoughts:

  • Please integrate well with the existing Workflow APIs! Ray becomes too confusing when there are different/competing ways of doing similar things. Make the integrations explicit, not hidden under the hood
  • Proposal 2 seems much cleaner, more robust and extensible. For example, with proposal 1, queues are tied to specific endpoints which will limit user API design. Plus the decorators are already confusing with FastAPI, Ray and now queuing options all intermingled.
  • Related to this, I'm in the process of building caching of computation results between Serve calls. A few extra parameters on results, like TTL would be really helpful. Or making the implicit caching aspect of proposal 2 explicitly configurable and distinct from the queuing aspect.

Finally, @edoakes could you please elaborate on why Workflows are too heavyweight? Do you mean the API is too cumbersome or that the durability guarantees add too much processing overhead?

@ahmadshadid
Copy link

Miha's comment on Slack:

We had a use case that was implemented similar to Approach 1, using Serve and Core tasks.
Feedback: I prefer approach 1, because it's a much cleaner story and less exotic concept than 2.
Also, assuming requests will route through ServeController, it might be nice to be able to spin down the Serve replicas when the queue is empty, using Serve autoscaler, and spin them back up once the queue gets requests.

if we go with Proposal 2 wouldn't be more scalable for the longterm ? I mean going with that will allow you to even init multiple ray serve instances? correct me if I am wrong.

@richardliaw richardliaw added the ray-team-created Ray Team created label Apr 19, 2023
@allemp
Copy link

allemp commented Oct 3, 2023

I'd be interested in a solution that would allow for on-demand fine-tuning models or even on-demand training small models. For example:

  1. User makes a train_model(params) request to Serve
  2. Ray starts training, validating and tuning the model
  3. User makes a predict(x) request to Serve

This would be useful when creating web applications where the user can use the UI to make small changes to the ML pipeline (selecting features, filtering training data, choosing thresholds etc.) where pre-computing each combination of metaparameters would be infeasible.

@kyle-v6x
Copy link

kyle-v6x commented Dec 14, 2023

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status.

Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely.

Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

@zhe-thoughts
Copy link
Collaborator

@kyle-v6x this is a very interesting scenario. I wonder if you have 30 mins to have a Zoom call to discuss more.. If so please email zhz at anyscale.com. Thanks!

@thatcort
Copy link

We implemented async Serve calls using the existing Workflows API. The Workflow provides a reference value that can be shared with the caller and used to look up the state of the request. It seems to be working well. The only missing piece is rate limiting. I'm curious why the previous proposal to build on top of Workflows was abandoned?

@Jainish-S
Copy link

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status.

Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely.

Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

How do you handle multiple training requests?

@arita37
Copy link

arita37 commented Mar 24, 2024

@kyle-v6x :

Good idea is rely on data lake or disk for persistence of tasks:
Pros: No need of extra DB server...,
Cons: Latency and concurrency issues, but for long batch, it less a problem.

Think all in one solution is very difficult since requirements are very different.

@kyle-v6x
Copy link

We're working on a similar system to fine-tune some small models for users. Our current solution:

  1. Run a dispatcher deployment which accepts requests to begin training.
  2. Dispatcher verifies the request and then calls a new actor with the required training code.
  3. This Actor's task is called async
  4. We return the TaskID and make calls to ray_state.get_task for scheduling status.

In actuallity, the Task is tied to an S3 object associted with the fine-tuning run which we poll for persistent status.
Roughly:

@ray.remote(
    num_gpus=1.0,
    num_cpus=4.0
)
class Trainer:
    def train(self):
        *long-running task here*
        *save to s3 etc.*

@serve.deployment()
@serve.ingress(app)
class Dispatcher:
    @app.post("/train")
    async def call_trainer(self, http_request: Request):
        body = await http_request.json()

        trainer = Trainer.remote()
        ref = trainer.train.remote()

        return str(ref.task_id())

The major issue here is the persistence is reliant on something like S3, and importantly, Serve will downscale Dispatcher replicas with active training runs (as they have already returned), resulting in Ray ending the Train call before completion. So the Dispatcher cannot be autoscaled safely.
Both suggestions above work, but I prefer the second option as se have a deployment graph for pre-processing and option 2 seems easier to work around. It's also more similar to the workflow of Ray core.

How do you handle multiple training requests?

Sorry for the late reply.

Since we're using the Ray Cluster Launcher, new training nodes are automatically added to handle however many requests come in. If there are none available, the requests are queued in the ray task queue. Note that you still have to handle potential ray failures yourself, as the ray queue will not.

@kyle-v6x
Copy link

Note that since 2.10.0 async functionality for serve deployments is partially broken. Details here.

@anyscalesam anyscalesam added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Request for new feature and/or capability ray-team-created Ray Team created RFC RFC issues serve Ray Serve Related Issue triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests