From 928f1de06548a41fda9b118264e0dafe966cdee6 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 06:59:29 -0700 Subject: [PATCH 01/19] First try --- vllm/executor/ray_gpu_executor.py | 58 +++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 4a6825c01fcf8..7e9eda3b34a5b 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -1,4 +1,5 @@ import asyncio +import concurrent.futures import os from collections import defaultdict from itertools import islice, repeat @@ -22,6 +23,8 @@ logger = init_logger(__name__) +PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE = False + class RayGPUExecutor(DistributedGPUExecutor): @@ -358,7 +361,9 @@ def _run_workers( # Just return futures return ray_worker_outputs - driver_worker_output = [] + def get_worker_outputs(): + return ray.get(ray_worker_outputs) + # In SPMD mode, the driver worker is the same as any other worker, # so we only explicitly execute on the driver worker if using a # non-SPMD worker class. @@ -368,23 +373,46 @@ def _run_workers( # Start the driver worker after all the ray workers. if not use_dummy_driver: - driver_worker_output = [ - self.driver_worker.execute_method(method, *driver_args, - **driver_kwargs) - ] + # Concurrently poll driver worker and remote ray workers + # in background thread to avoid deadlock when performing + # distributed init + # (see: https://github.com/vllm-project/vllm/issues/3455) + if PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE: + with concurrent.futures.ThreadPoolExecutor( + max_workers=2) as executor: + driver_poll_thread = executor.submit( + self.driver_worker.execute_method, method, + *driver_args, **driver_kwargs) + worker_poll_thread = executor.submit( + get_worker_outputs) + + for completed_future in concurrent.futures.as_completed( + [driver_poll_thread, worker_poll_thread]): + # Will raise exception if underlying thread raises + res = completed_future.result() + if not isinstance(res, list): + driver_output = [res] + else: + worker_outputs = res + all_worker_outputs = driver_output + worker_outputs + else: + ray.get([ + ray.put(self.driver_worker.execute_method, method, * + driver_args, **driver_kwargs) + ] + ray_worker_outputs) else: assert self.driver_dummy_worker is not None - driver_worker_output = [ - ray.get( - self.driver_dummy_worker.execute_method.remote( - method, *driver_args, **driver_kwargs)) - ] - - # Get the results of the ray workers. - if self.workers: - ray_worker_outputs = ray.get(ray_worker_outputs) + # Concurrently poll remote driver worker and remote ray workers + # to avoid deadlock + # (see: https://github.com/vllm-project/vllm/issues/3455) + all_worker_outputs = ray.get([ + self.driver_dummy_worker.execute_method.remote( + method, *driver_args, **driver_kwargs) + ] + ray_worker_outputs) + else: + all_worker_outputs = ray.get(ray_worker_outputs) - return driver_worker_output + ray_worker_outputs + return all_worker_outputs def _wait_for_tasks_completion(self, parallel_worker_tasks: Any) -> None: """Wait for futures returned from _run_workers() with From 8ade876c786417ee6e1b2a99036b92262d3e4aae Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:03:23 -0700 Subject: [PATCH 02/19] simplify --- vllm/executor/ray_gpu_executor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 7e9eda3b34a5b..092688107b0c7 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -361,9 +361,6 @@ def _run_workers( # Just return futures return ray_worker_outputs - def get_worker_outputs(): - return ray.get(ray_worker_outputs) - # In SPMD mode, the driver worker is the same as any other worker, # so we only explicitly execute on the driver worker if using a # non-SPMD worker class. @@ -384,7 +381,7 @@ def get_worker_outputs(): self.driver_worker.execute_method, method, *driver_args, **driver_kwargs) worker_poll_thread = executor.submit( - get_worker_outputs) + ray.get, ray_worker_outputs) for completed_future in concurrent.futures.as_completed( [driver_poll_thread, worker_poll_thread]): From e56448b9c73469e82e63ca91992d3a4d82c786fb Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:08:23 -0700 Subject: [PATCH 03/19] minor error --- vllm/executor/ray_gpu_executor.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 092688107b0c7..454f0ea0756cc 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -375,6 +375,12 @@ def _run_workers( # distributed init # (see: https://github.com/vllm-project/vllm/issues/3455) if PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE: + ray.get([ + ray.put( + self.driver_worker.execute_method( + method, *driver_args, **driver_kwargs)) + ] + ray_worker_outputs) + else: with concurrent.futures.ThreadPoolExecutor( max_workers=2) as executor: driver_poll_thread = executor.submit( @@ -392,11 +398,6 @@ def _run_workers( else: worker_outputs = res all_worker_outputs = driver_output + worker_outputs - else: - ray.get([ - ray.put(self.driver_worker.execute_method, method, * - driver_args, **driver_kwargs) - ] + ray_worker_outputs) else: assert self.driver_dummy_worker is not None # Concurrently poll remote driver worker and remote ray workers From a808cb01e8f6e2b6d3ba21a7ab4789b4b0c68c26 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:08:44 -0700 Subject: [PATCH 04/19] Try with object store first --- vllm/executor/ray_gpu_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 454f0ea0756cc..c15b871f2316c 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -23,7 +23,7 @@ logger = init_logger(__name__) -PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE = False +PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE = True class RayGPUExecutor(DistributedGPUExecutor): From f67336d7766623ecacf9b4dd7480de665cf2f8de Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:12:14 -0700 Subject: [PATCH 05/19] move comments around --- vllm/executor/ray_gpu_executor.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index c15b871f2316c..f15dfe9900ad5 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -365,15 +365,14 @@ def _run_workers( # so we only explicitly execute on the driver worker if using a # non-SPMD worker class. if not self.use_ray_spmd_worker: + # Concurrently poll driver worker and remote ray workers + # to avoid deadlock when performing distributed init + # (see: https://github.com/vllm-project/vllm/issues/3455) driver_args = args if all_args is None else all_args[0] driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] - # Start the driver worker after all the ray workers. + # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - # Concurrently poll driver worker and remote ray workers - # in background thread to avoid deadlock when performing - # distributed init - # (see: https://github.com/vllm-project/vllm/issues/3455) if PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE: ray.get([ ray.put( @@ -400,9 +399,6 @@ def _run_workers( all_worker_outputs = driver_output + worker_outputs else: assert self.driver_dummy_worker is not None - # Concurrently poll remote driver worker and remote ray workers - # to avoid deadlock - # (see: https://github.com/vllm-project/vllm/issues/3455) all_worker_outputs = ray.get([ self.driver_dummy_worker.execute_method.remote( method, *driver_args, **driver_kwargs) From 54652fe2e3a79e66345f3b5ca6bdc200548b74de Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:13:31 -0700 Subject: [PATCH 06/19] minor errors --- vllm/executor/ray_gpu_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index f15dfe9900ad5..c0955dea8db77 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -374,7 +374,7 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: if PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE: - ray.get([ + all_worker_outputs = ray.get([ ray.put( self.driver_worker.execute_method( method, *driver_args, **driver_kwargs)) @@ -396,7 +396,7 @@ def _run_workers( driver_output = [res] else: worker_outputs = res - all_worker_outputs = driver_output + worker_outputs + all_worker_outputs = driver_output + worker_outputs else: assert self.driver_dummy_worker is not None all_worker_outputs = ray.get([ From 6476726043215541f605fd517cfb90fdd31abf8f Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:15:09 -0700 Subject: [PATCH 07/19] improve comments --- vllm/executor/ray_gpu_executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index c0955dea8db77..73f75255ff61e 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -380,6 +380,9 @@ def _run_workers( method, *driver_args, **driver_kwargs)) ] + ray_worker_outputs) else: + # If not wanting to always store driver result in ray object + # store one can simply poll driver and worker tasks concurrently + # in background threads with concurrent.futures.ThreadPoolExecutor( max_workers=2) as executor: driver_poll_thread = executor.submit( From eb05c935ea655d3dbc2f6d1f76255ff8bfb05ff9 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:16:24 -0700 Subject: [PATCH 08/19] improve code style --- vllm/executor/ray_gpu_executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 73f75255ff61e..533693bb133a3 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -402,10 +402,10 @@ def _run_workers( all_worker_outputs = driver_output + worker_outputs else: assert self.driver_dummy_worker is not None - all_worker_outputs = ray.get([ - self.driver_dummy_worker.execute_method.remote( - method, *driver_args, **driver_kwargs) - ] + ray_worker_outputs) + driver_output = self.driver_dummy_worker.execute_method.remote( + method, *driver_args, **driver_kwargs) + all_worker_outputs = ray.get([driver_output] + + ray_worker_outputs) else: all_worker_outputs = ray.get(ray_worker_outputs) From 68d47d3430388fc573f3f3c09b179892cd92790f Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:17:11 -0700 Subject: [PATCH 09/19] minor --- vllm/executor/ray_gpu_executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 533693bb133a3..f272ad71b3866 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -380,9 +380,9 @@ def _run_workers( method, *driver_args, **driver_kwargs)) ] + ray_worker_outputs) else: - # If not wanting to always store driver result in ray object - # store one can simply poll driver and worker tasks concurrently - # in background threads + # If not wanting to always store driver result in ray + # object store one can simply poll driver and worker + # task concurrently in background threads with concurrent.futures.ThreadPoolExecutor( max_workers=2) as executor: driver_poll_thread = executor.submit( From 89382c15757610b8fb84d47c56246fd2a4e29cab Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:21:51 -0700 Subject: [PATCH 10/19] remove incorrect solution --- vllm/executor/ray_gpu_executor.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index f272ad71b3866..32000dd75a8eb 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -23,8 +23,6 @@ logger = init_logger(__name__) -PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE = True - class RayGPUExecutor(DistributedGPUExecutor): @@ -373,12 +371,13 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - if PLACE_DRIVER_RESULT_IN_RAY_OBJECT_STORE: - all_worker_outputs = ray.get([ - ray.put( - self.driver_worker.execute_method( - method, *driver_args, **driver_kwargs)) - ] + ray_worker_outputs) + if not ray_worker_outputs: + # Corner case; no special handling as no concurrency with + # worker tasks is involved + all_worker_outputs = [ + self.driver_worker.execute_method( + method, *driver_args, **driver_kwargs) + ] else: # If not wanting to always store driver result in ray # object store one can simply poll driver and worker From 210f1ed8f13ba0aca7b6b4f0d1d3da74cb19f042 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:22:29 -0700 Subject: [PATCH 11/19] minor --- vllm/executor/ray_gpu_executor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 32000dd75a8eb..cdbfe3967b6d8 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -379,9 +379,8 @@ def _run_workers( method, *driver_args, **driver_kwargs) ] else: - # If not wanting to always store driver result in ray - # object store one can simply poll driver and worker - # task concurrently in background threads + # Poll driver and worker tasks concurrently + # in background threads with concurrent.futures.ThreadPoolExecutor( max_workers=2) as executor: driver_poll_thread = executor.submit( From 9217cb0743909a85ac19e78c240ad4371f2b1752 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:31:24 -0700 Subject: [PATCH 12/19] Make background thread opt-in --- vllm/executor/ray_gpu_executor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index cdbfe3967b6d8..d5318c68cf5aa 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -240,7 +240,9 @@ def sort_by_driver_then_worker_ip(worker): ] self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) - self._run_workers("init_device") + # must run driver in background thread if len(workers) > 0 to avoid + # NCCL init deadlock (https://github.com/vllm-project/vllm/pull/7159) + self._run_workers("init_device", run_driver_in_background_thread=True) self._run_workers("load_model", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers) @@ -310,6 +312,7 @@ def _run_workers( all_kwargs: Optional[List[Dict[str, Any]]] = None, use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, + run_driver_in_background_thread: bool = False, **kwargs, ) -> Any: """Runs the given method on all workers. Can be used in the following @@ -371,7 +374,7 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - if not ray_worker_outputs: + if not ray_worker_outputs or not run_driver_in_background_thread: # Corner case; no special handling as no concurrency with # worker tasks is involved all_worker_outputs = [ From d6d4ee759d3cfe1398b5800164bba10b1c7a10d4 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:33:53 -0700 Subject: [PATCH 13/19] improve comments --- vllm/executor/ray_gpu_executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index d5318c68cf5aa..19417d31ecf07 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -374,9 +374,9 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - if not ray_worker_outputs or not run_driver_in_background_thread: - # Corner case; no special handling as no concurrency with - # worker tasks is involved + if not run_driver_in_background_thread or not ray_worker_outputs: + # no background thread required when there are + # no concurrent worker tasks all_worker_outputs = [ self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) From 3af1eb484b48b30a1d5e6935d2e07e35b83958f3 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:34:44 -0700 Subject: [PATCH 14/19] improve --- vllm/executor/ray_gpu_executor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 19417d31ecf07..bc919538410ce 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -374,9 +374,10 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - if not run_driver_in_background_thread or not ray_worker_outputs: - # no background thread required when there are - # no concurrent worker tasks + if (not run_driver_in_background_thread + # no background thread required when there are + # no concurrent worker tasks + or not ray_worker_outputs): all_worker_outputs = [ self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) From 0308fd36ce64b64618ca35d94d6458f36f5b1c94 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:43:32 -0700 Subject: [PATCH 15/19] improve --- vllm/executor/ray_gpu_executor.py | 53 +++++++++++++++++-------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index bc919538410ce..39caa8acd8b79 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -374,34 +374,39 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: - if (not run_driver_in_background_thread + # Driver task will run in this python process + if run_driver_in_background_thread: + if not ray_worker_outputs: # no background thread required when there are # no concurrent worker tasks - or not ray_worker_outputs): - all_worker_outputs = [ - self.driver_worker.execute_method( + driver_output = self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) - ] + all_worker_outputs = [driver_output] + else: + # Poll driver and worker tasks concurrently + # in background threads + with concurrent.futures.ThreadPoolExecutor( + max_workers=2) as executor: + driver_poll_thread = executor.submit( + self.driver_worker.execute_method, method, + *driver_args, **driver_kwargs) + worker_poll_thread = executor.submit( + ray.get, ray_worker_outputs) + + for completed_future in concurrent.futures.as_completed( + [driver_poll_thread, worker_poll_thread]): + # Will raise exception if underlying thread raises + res = completed_future.result() + if not isinstance(res, list): + driver_output = [res] + else: + worker_outputs = res + all_worker_outputs = driver_output + worker_outputs else: - # Poll driver and worker tasks concurrently - # in background threads - with concurrent.futures.ThreadPoolExecutor( - max_workers=2) as executor: - driver_poll_thread = executor.submit( - self.driver_worker.execute_method, method, - *driver_args, **driver_kwargs) - worker_poll_thread = executor.submit( - ray.get, ray_worker_outputs) - - for completed_future in concurrent.futures.as_completed( - [driver_poll_thread, worker_poll_thread]): - # Will raise exception if underlying thread raises - res = completed_future.result() - if not isinstance(res, list): - driver_output = [res] - else: - worker_outputs = res - all_worker_outputs = driver_output + worker_outputs + driver_output = self.driver_worker.execute_method( + method, *driver_args, **driver_kwargs) + all_worker_outputs = [driver_output + ] + ray.get(ray_worker_outputs) else: assert self.driver_dummy_worker is not None driver_output = self.driver_dummy_worker.execute_method.remote( From 9fc5ab028fe975aa49ffbcbbe4d3741dd99f97d6 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:46:28 -0700 Subject: [PATCH 16/19] minor improve --- vllm/executor/ray_gpu_executor.py | 49 +++++++++++++------------------ 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 39caa8acd8b79..0c5a1e555bd52 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -1,7 +1,7 @@ import asyncio -import concurrent.futures import os from collections import defaultdict +from concurrent import futures from itertools import islice, repeat from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple @@ -375,33 +375,26 @@ def _run_workers( # Start the driver worker task after all the ray workers'. if not use_dummy_driver: # Driver task will run in this python process - if run_driver_in_background_thread: - if not ray_worker_outputs: - # no background thread required when there are - # no concurrent worker tasks - driver_output = self.driver_worker.execute_method( - method, *driver_args, **driver_kwargs) - all_worker_outputs = [driver_output] - else: - # Poll driver and worker tasks concurrently - # in background threads - with concurrent.futures.ThreadPoolExecutor( - max_workers=2) as executor: - driver_poll_thread = executor.submit( - self.driver_worker.execute_method, method, - *driver_args, **driver_kwargs) - worker_poll_thread = executor.submit( - ray.get, ray_worker_outputs) - - for completed_future in concurrent.futures.as_completed( - [driver_poll_thread, worker_poll_thread]): - # Will raise exception if underlying thread raises - res = completed_future.result() - if not isinstance(res, list): - driver_output = [res] - else: - worker_outputs = res - all_worker_outputs = driver_output + worker_outputs + if run_driver_in_background_thread and ray_worker_outputs: + # If ray worker tasks exist, poll driver and worker + # tasks concurrently in background threads + with futures.ThreadPoolExecutor(max_workers=2) as executor: + driver_poll_thread = executor.submit( + self.driver_worker.execute_method, method, + *driver_args, **driver_kwargs) + worker_poll_thread = executor.submit( + ray.get, ray_worker_outputs) + + for completed_future in futures.as_completed( + [driver_poll_thread, worker_poll_thread]): + # Will raise exception if + # underlying thread raises + res = completed_future.result() + if not isinstance(res, list): + driver_output = [res] + else: + worker_outputs = res + all_worker_outputs = driver_output + worker_outputs else: driver_output = self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) From 4dbdf4c42aaf49646010985edb914c11022d1786 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:49:23 -0700 Subject: [PATCH 17/19] improve comments --- vllm/executor/ray_gpu_executor.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 0c5a1e555bd52..cff9328264b8a 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -366,9 +366,6 @@ def _run_workers( # so we only explicitly execute on the driver worker if using a # non-SPMD worker class. if not self.use_ray_spmd_worker: - # Concurrently poll driver worker and remote ray workers - # to avoid deadlock when performing distributed init - # (see: https://github.com/vllm-project/vllm/issues/3455) driver_args = args if all_args is None else all_args[0] driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] @@ -377,7 +374,13 @@ def _run_workers( # Driver task will run in this python process if run_driver_in_background_thread and ray_worker_outputs: # If ray worker tasks exist, poll driver and worker - # tasks concurrently in background threads + # tasks concurrently in background threads. + # + # This can avoid deadlock if the driver task is + # blocking on some out of band comm that is invalidated + # by a Ray worker exception. + # + # See: https://github.com/vllm-project/vllm/issues/3455 with futures.ThreadPoolExecutor(max_workers=2) as executor: driver_poll_thread = executor.submit( self.driver_worker.execute_method, method, From 2f6cdf7bea31f0c20f8f932f1a44354cdfa2a7e3 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 07:53:00 -0700 Subject: [PATCH 18/19] minor improve --- vllm/executor/ray_gpu_executor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index cff9328264b8a..cb8ef73d7e0f4 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -377,8 +377,8 @@ def _run_workers( # tasks concurrently in background threads. # # This can avoid deadlock if the driver task is - # blocking on some out of band comm that is invalidated - # by a Ray worker exception. + # blocking on some out of band comm (e.g. torch.dist.init) + # that is invalidated by a Ray worker exception. # # See: https://github.com/vllm-project/vllm/issues/3455 with futures.ThreadPoolExecutor(max_workers=2) as executor: @@ -390,8 +390,7 @@ def _run_workers( for completed_future in futures.as_completed( [driver_poll_thread, worker_poll_thread]): - # Will raise exception if - # underlying thread raises + # Will raise exception if underlying thread raises res = completed_future.result() if not isinstance(res, list): driver_output = [res] From 7b1bc1fca5d6ce7c496cf174c652b142fd3d7139 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 5 Aug 2024 08:01:11 -0700 Subject: [PATCH 19/19] minor --- vllm/executor/ray_gpu_executor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index cb8ef73d7e0f4..7b9f5e6ce0fe9 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -241,7 +241,8 @@ def sort_by_driver_then_worker_ip(worker): self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) # must run driver in background thread if len(workers) > 0 to avoid - # NCCL init deadlock (https://github.com/vllm-project/vllm/pull/7159) + # distributed init deadlock. + # (https://github.com/vllm-project/vllm/pull/7159) self._run_workers("init_device", run_driver_in_background_thread=True) self._run_workers("load_model", max_concurrent_workers=self.parallel_config. @@ -373,14 +374,15 @@ def _run_workers( if not use_dummy_driver: # Driver task will run in this python process if run_driver_in_background_thread and ray_worker_outputs: - # If ray worker tasks exist, poll driver and worker - # tasks concurrently in background threads. + # Poll driver and worker tasks concurrently in background + # threads. # # This can avoid deadlock if the driver task is # blocking on some out of band comm (e.g. torch.dist.init) # that is invalidated by a Ray worker exception. # # See: https://github.com/vllm-project/vllm/issues/3455 + with futures.ThreadPoolExecutor(max_workers=2) as executor: driver_poll_thread = executor.submit( self.driver_worker.execute_method, method,