From d76d41d334f93baf89164cbe4d7f62d6b5981102 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 18 Sep 2024 17:49:57 -0600 Subject: [PATCH 01/10] :recycle: Move health checks to separate thread Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/__init__.py | 3 +- vllm/engine/multiprocessing/client.py | 30 ++++++----- vllm/engine/multiprocessing/engine.py | 71 ++++++++++++++++++++----- 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py index ba5c6e15fc821..2d78dc8827b54 100644 --- a/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py @@ -12,7 +12,8 @@ IPC_INPUT_EXT = "_input_socket" IPC_OUTPUT_EXT = "_output_socket" -IPC_HEALTH_EXT = "_health_socket" +IPC_HEALTH_OUT_EXT = "_health_out_socket" +IPC_HEALTH_IN_EXT = "_health_in_socket" IPC_DATA_EXT = "_data_socket" diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 2cb4de79131f1..b714cad3f75be 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -16,12 +16,12 @@ # yapf conflicts with isort for this block # yapf: disable from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, - IPC_HEALTH_EXT, IPC_INPUT_EXT, - IPC_OUTPUT_EXT, RPC_REQUEST_T, - VLLM_RPC_SUCCESS_STR, RPCAbortRequest, - RPCError, RPCGenerateRequest, - RPCHealthRequest, RPCStartupRequest, - RPCStartupResponse) + IPC_HEALTH_IN_EXT, IPC_HEALTH_OUT_EXT, + IPC_INPUT_EXT, IPC_OUTPUT_EXT, + RPC_REQUEST_T, VLLM_RPC_SUCCESS_STR, + RPCAbortRequest, RPCError, + RPCGenerateRequest, RPCHealthRequest, + RPCStartupRequest, RPCStartupResponse) # yapf: enable from vllm.envs import VLLM_RPC_TIMEOUT from vllm.inputs import PromptInputs @@ -95,8 +95,14 @@ def __init__(self, ipc_path: str, engine_config: EngineConfig): self.output_socket.connect(f"{ipc_path}{IPC_OUTPUT_EXT}") # IPC path for ack of check_health requests. - self.health_socket: Socket = self.context.socket(zmq.constants.PULL) - self.health_socket.connect(f"{ipc_path}{IPC_HEALTH_EXT}") + self.health_req_socket: Socket = self.context.socket( + zmq.constants.PUSH) + self.health_req_socket.connect(f"{ipc_path}{IPC_HEALTH_IN_EXT}") + + # IPC path for ack of check_health requests. + self.health_ack_socket: Socket = self.context.socket( + zmq.constants.PULL) + self.health_ack_socket.connect(f"{ipc_path}{IPC_HEALTH_OUT_EXT}") # IPC path for the data socket. self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" @@ -148,19 +154,19 @@ async def run_check_health_loop(self, timeout: int): try: while True: - if await self.health_socket.poll(timeout=timeout) == 0: + if await self.health_ack_socket.poll(timeout=timeout) == 0: # Wakeup every N seconds and do a health probe. await self._send_one_way_rpc_request( - RPCHealthRequest(), self.input_socket) + RPCHealthRequest(), self.health_req_socket) # Wait for ack from the health socket. await self._await_ack(error_message="Health check failed.", - socket=self.health_socket) + socket=self.health_ack_socket) else: # Server sent a health status message unprompted. await self._check_success( error_message="Health check failed.", - socket=self.health_socket) + socket=self.health_ack_socket) logger.debug("Health probe successful.") diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 70cd6e5cb6000..941a5df37dbf6 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -1,5 +1,7 @@ import pickle import signal +import threading +import time from contextlib import contextmanager from typing import Iterator, List, Optional, Union @@ -12,8 +14,9 @@ # yapf conflicts with isort for this block # yapf: disable from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, - IPC_HEALTH_EXT, IPC_INPUT_EXT, - IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, + IPC_HEALTH_IN_EXT, IPC_HEALTH_OUT_EXT, + IPC_INPUT_EXT, IPC_OUTPUT_EXT, + REQUEST_OUTPUTS_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, RPCError, RPCGenerateRequest, RPCHealthRequest, RPCStartupRequest, @@ -84,9 +87,13 @@ def __init__(self, self.output_socket = self.ctx.socket(zmq.constants.PUSH) self.output_socket.bind(f"{ipc_path}{IPC_OUTPUT_EXT}") + # Receive health requests from the client + self.health_in_socket = self.ctx.socket(zmq.constants.PULL) + self.health_in_socket.bind(f"{ipc_path}{IPC_HEALTH_IN_EXT}") + # Send health status back to client. - self.health_socket = self.ctx.socket(zmq.constants.PUSH) - self.health_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}") + self.health_out_socket = self.ctx.socket(zmq.constants.PUSH) + self.health_out_socket.bind(f"{ipc_path}{IPC_HEALTH_OUT_EXT}") # IPC path for the data socket. self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" @@ -94,6 +101,13 @@ def __init__(self, # Error state. self._errored_with: Optional[BaseException] = None + # Health checking thread + self.health_check_thread = threading.Thread( + target=self._health_check_loop) + self._health_check_stop_event = threading.Event() + + self._last_alive_time = time.time() + @property def dead_error(self) -> BaseException: if self._errored_with is not None: @@ -124,6 +138,8 @@ def start(self): try: logger.debug("Starting Startup Loop.") self.run_startup_loop() + logger.debug("Starting health check thread") + self.health_check_thread.start() logger.debug("Starting Engine Loop.") self.run_engine_loop() except Exception as e: @@ -134,9 +150,25 @@ def start(self): logger.debug("MQLLMEngine is shut down.") self.cleanup() + def _health_check_loop(self): + while True: + if self.health_in_socket.poll(timeout=1000) != 0: + frames = self.health_in_socket.recv_multipart(copy=False) + request = pickle.loads(frames[0].buffer) + if not isinstance(request, RPCHealthRequest): + logger.warning( + "Received a non-health request request on " + "the health request port: %s", str(type(request))) + + self._handle_health_request() + if self._health_check_stop_event.is_set(): + logger.debug("Exiting MQLLMEngine health check thread") + return + def cleanup(self): """Cleanup zeromq state on shutdown.""" # Closes all sockets and destroys context. + self._health_check_stop_event.set() self.ctx.destroy(linger=0) del self.engine @@ -175,9 +207,11 @@ def run_engine_loop(self): """Core busy loop of the LLMEngine.""" while True: + self._alive() if not self.engine.has_unfinished_requests(): # Poll until there is work to do. while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0: + self._alive() self.engine.do_log_stats() logger.debug("Waiting for new requests in engine loop.") @@ -193,7 +227,7 @@ def run_engine_loop(self): def engine_step(self) -> List[RequestOutput]: """Engine step wrapper with error handling.""" - + self._alive() try: return self.engine.step() except SystemExit: @@ -210,6 +244,7 @@ def handle_new_input(self): """Handle new input from the socket""" try: while self.input_socket.poll(timeout=0) != 0: + self._alive() frames = self.input_socket.recv_multipart(copy=False) request = pickle.loads(frames[0].buffer) @@ -221,10 +256,8 @@ def handle_new_input(self): self._handle_generate_request(request) elif isinstance(request, RPCAbortRequest): self._handle_abort_request(request) - elif isinstance(request, RPCHealthRequest): - self._handle_health_request() else: - raise ValueError("Unknown RPCRequest Type: {request}") + raise ValueError(f"Unknown RPCRequest Type: {request}") except Exception as e: self._set_errored(e) @@ -272,12 +305,23 @@ def _handle_abort_request(self, request: RPCAbortRequest): logger.info("Aborted request %s.", request.request_id) def _handle_health_request(self): + # Send unhealthy if engine has already errored if self._errored_with is not None: self._send_unhealthy(self._errored_with) + # Check for life of the main loop + if time.time() - self._last_alive_time > 30: + self._send_unhealthy(RuntimeError("Engine loop has died")) + # Raises error if unhealthy. - self.engine.check_health() - self._send_healthy() + try: + self.engine.check_health() + self._send_healthy() + except Exception as e: + # TODO: set errored_with here? + # Does it matter if we're just gonna kill the engine? + # Assume requests will be failing anyway if engine is unhealthy? + self._send_unhealthy(e) def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): """Send List of RequestOutput to RPCClient.""" @@ -287,12 +331,12 @@ def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): def _send_healthy(self): """Send HEALTHY message to RPCClient.""" - self.health_socket.send_multipart(HEALTHY_RESPONSE, copy=False) + self.health_out_socket.send_multipart(HEALTHY_RESPONSE, copy=False) def _send_unhealthy(self, error: BaseException): """Send UNHEALTHY message to RPCClient.""" error_bytes = pickle.dumps(error) - self.health_socket.send_multipart((error_bytes, ), copy=False) + self.health_out_socket.send_multipart((error_bytes, ), copy=False) def _async_socket_engine_callback(self, request_outputs: REQUEST_OUTPUTS_T): @@ -305,6 +349,9 @@ def _set_errored(self, e: BaseException): if self._errored_with is None: self._errored_with = e + def _alive(self): + self._last_alive_time = time.time() + def run_mp_engine(engine_args: AsyncEngineArgs, usage_context: UsageContext, ipc_path: str): From 95d2028294fba2aae41ddc4c9e0df7c0f2664482 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 10:28:27 -0600 Subject: [PATCH 02/10] :recycle: Change to heartbeat Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/__init__.py | 10 +---- vllm/engine/multiprocessing/client.py | 53 ++++++++++--------------- vllm/engine/multiprocessing/engine.py | 50 +++++++++-------------- 3 files changed, 41 insertions(+), 72 deletions(-) diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py index 2d78dc8827b54..f65a781a4d5e9 100644 --- a/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py @@ -12,8 +12,7 @@ IPC_INPUT_EXT = "_input_socket" IPC_OUTPUT_EXT = "_output_socket" -IPC_HEALTH_OUT_EXT = "_health_out_socket" -IPC_HEALTH_IN_EXT = "_health_in_socket" +IPC_HEALTH_EXT = "_health_socket" IPC_DATA_EXT = "_data_socket" @@ -43,10 +42,6 @@ class RPCAbortRequest: request_id: str -class RPCHealthRequest: - pass - - class RPCStartupRequest(Enum): IS_SERVER_READY = 1 @@ -56,8 +51,7 @@ class RPCStartupResponse: tracing_enabled: bool -RPC_REQUEST_T = Union[RPCGenerateRequest, RPCAbortRequest, RPCHealthRequest, - RPCStartupRequest] +RPC_REQUEST_T = Union[RPCGenerateRequest, RPCAbortRequest, RPCStartupRequest] REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCError] diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index b714cad3f75be..7d9f5c9583fe1 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -16,11 +16,10 @@ # yapf conflicts with isort for this block # yapf: disable from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, - IPC_HEALTH_IN_EXT, IPC_HEALTH_OUT_EXT, - IPC_INPUT_EXT, IPC_OUTPUT_EXT, - RPC_REQUEST_T, VLLM_RPC_SUCCESS_STR, - RPCAbortRequest, RPCError, - RPCGenerateRequest, RPCHealthRequest, + IPC_HEALTH_EXT, IPC_INPUT_EXT, + IPC_OUTPUT_EXT, RPC_REQUEST_T, + VLLM_RPC_SUCCESS_STR, RPCAbortRequest, + RPCError, RPCGenerateRequest, RPCStartupRequest, RPCStartupResponse) # yapf: enable from vllm.envs import VLLM_RPC_TIMEOUT @@ -94,15 +93,9 @@ def __init__(self, ipc_path: str, engine_config: EngineConfig): self.output_socket: Socket = self.context.socket(zmq.constants.PULL) self.output_socket.connect(f"{ipc_path}{IPC_OUTPUT_EXT}") - # IPC path for ack of check_health requests. - self.health_req_socket: Socket = self.context.socket( - zmq.constants.PUSH) - self.health_req_socket.connect(f"{ipc_path}{IPC_HEALTH_IN_EXT}") - - # IPC path for ack of check_health requests. - self.health_ack_socket: Socket = self.context.socket( - zmq.constants.PULL) - self.health_ack_socket.connect(f"{ipc_path}{IPC_HEALTH_OUT_EXT}") + # IPC path for acking heartbeats. + self.heartbeat_socket: Socket = self.context.socket(zmq.constants.PULL) + self.heartbeat_socket.connect(f"{ipc_path}{IPC_HEALTH_EXT}") # IPC path for the data socket. self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" @@ -142,31 +135,25 @@ def get_data_socket(self) -> Iterator[Socket]: socket.close(linger=0) async def run_check_health_loop(self, timeout: int): - """Background loop that continually probes the RPCServer for health. - - The loop sends CHECK_HEALTH requests to the INPUT_SOCKET, which - the MQLLMEngine server is blocking on. - - The Server replies on the HEALTH_SOCKET (rather than on the - OUTPUT_SOCKET such that the messages are not intermingled with - output streaming). + """Background loop that continually listens to the RPCServer for + heartbeats. """ - try: while True: - if await self.health_ack_socket.poll(timeout=timeout) == 0: - # Wakeup every N seconds and do a health probe. - await self._send_one_way_rpc_request( - RPCHealthRequest(), self.health_req_socket) - - # Wait for ack from the health socket. - await self._await_ack(error_message="Health check failed.", - socket=self.health_ack_socket) + if await self.heartbeat_socket.poll(timeout=timeout) == 0: + # No heartbeat was received. Set error and exit the loop + self._set_errored( + TimeoutError("No heartbeat received " + "from MQLLMEngine")) + logger.debug("Shutting down MQLLMEngineClient check " + "health loop due to timeout") + break + else: - # Server sent a health status message unprompted. + # Heartbeat received- check the message await self._check_success( error_message="Health check failed.", - socket=self.health_ack_socket) + socket=self.heartbeat_socket) logger.debug("Health probe successful.") diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 941a5df37dbf6..ce8a4c951eee8 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -14,13 +14,11 @@ # yapf conflicts with isort for this block # yapf: disable from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, - IPC_HEALTH_IN_EXT, IPC_HEALTH_OUT_EXT, - IPC_INPUT_EXT, IPC_OUTPUT_EXT, - REQUEST_OUTPUTS_T, + IPC_HEALTH_EXT, IPC_INPUT_EXT, + IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, RPCError, RPCGenerateRequest, - RPCHealthRequest, RPCStartupRequest, - RPCStartupResponse) + RPCStartupRequest, RPCStartupResponse) # yapf: enable from vllm.logger import init_logger from vllm.outputs import RequestOutput @@ -32,6 +30,7 @@ logger = init_logger(__name__) POLLING_TIMEOUT_MS = 10000 +HEARTBEAT_INTERVAL_MS = 1000 HEALTHY_RESPONSE = (pickle.dumps(VLLM_RPC_SUCCESS_STR), ) @@ -87,13 +86,9 @@ def __init__(self, self.output_socket = self.ctx.socket(zmq.constants.PUSH) self.output_socket.bind(f"{ipc_path}{IPC_OUTPUT_EXT}") - # Receive health requests from the client - self.health_in_socket = self.ctx.socket(zmq.constants.PULL) - self.health_in_socket.bind(f"{ipc_path}{IPC_HEALTH_IN_EXT}") - - # Send health status back to client. - self.health_out_socket = self.ctx.socket(zmq.constants.PUSH) - self.health_out_socket.bind(f"{ipc_path}{IPC_HEALTH_OUT_EXT}") + # Send heartbeats back to client. + self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH) + self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}") # IPC path for the data socket. self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}" @@ -103,7 +98,7 @@ def __init__(self, # Health checking thread self.health_check_thread = threading.Thread( - target=self._health_check_loop) + target=self._heartbeat_loop) self._health_check_stop_event = threading.Event() self._last_alive_time = time.time() @@ -150,21 +145,6 @@ def start(self): logger.debug("MQLLMEngine is shut down.") self.cleanup() - def _health_check_loop(self): - while True: - if self.health_in_socket.poll(timeout=1000) != 0: - frames = self.health_in_socket.recv_multipart(copy=False) - request = pickle.loads(frames[0].buffer) - if not isinstance(request, RPCHealthRequest): - logger.warning( - "Received a non-health request request on " - "the health request port: %s", str(type(request))) - - self._handle_health_request() - if self._health_check_stop_event.is_set(): - logger.debug("Exiting MQLLMEngine health check thread") - return - def cleanup(self): """Cleanup zeromq state on shutdown.""" # Closes all sockets and destroys context. @@ -304,7 +284,15 @@ def _handle_abort_request(self, request: RPCAbortRequest): if self.log_requests: logger.info("Aborted request %s.", request.request_id) - def _handle_health_request(self): + def _heartbeat_loop(self): + while not self._health_check_stop_event.wait( + timeout=HEARTBEAT_INTERVAL_MS): + # Loops every `HEARTBEAT_INTERVAL_MS` until the stop event is set + self._heartbeat() + + logger.debug("Exiting MQLLMEngine health check thread") + + def _heartbeat(self): # Send unhealthy if engine has already errored if self._errored_with is not None: self._send_unhealthy(self._errored_with) @@ -331,12 +319,12 @@ def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): def _send_healthy(self): """Send HEALTHY message to RPCClient.""" - self.health_out_socket.send_multipart(HEALTHY_RESPONSE, copy=False) + self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False) def _send_unhealthy(self, error: BaseException): """Send UNHEALTHY message to RPCClient.""" error_bytes = pickle.dumps(error) - self.health_out_socket.send_multipart((error_bytes, ), copy=False) + self.heartbeat_socket.send_multipart((error_bytes, ), copy=False) def _async_socket_engine_callback(self, request_outputs: REQUEST_OUTPUTS_T): From b01690d2fd397ca781c07df61c9deb680a49b359 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 10:55:22 -0600 Subject: [PATCH 03/10] :recycle: more health_check -> heartbeat Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/engine.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index ce8a4c951eee8..403fadeb8dc05 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -96,10 +96,9 @@ def __init__(self, # Error state. self._errored_with: Optional[BaseException] = None - # Health checking thread - self.health_check_thread = threading.Thread( - target=self._heartbeat_loop) - self._health_check_stop_event = threading.Event() + # Heartbeat thread + self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop) + self._heartbeat_stop_event = threading.Event() self._last_alive_time = time.time() @@ -133,8 +132,8 @@ def start(self): try: logger.debug("Starting Startup Loop.") self.run_startup_loop() - logger.debug("Starting health check thread") - self.health_check_thread.start() + logger.debug("Starting heartbeat thread") + self.heartbeat_thread.start() logger.debug("Starting Engine Loop.") self.run_engine_loop() except Exception as e: @@ -148,7 +147,7 @@ def start(self): def cleanup(self): """Cleanup zeromq state on shutdown.""" # Closes all sockets and destroys context. - self._health_check_stop_event.set() + self._heartbeat_stop_event.set() self.ctx.destroy(linger=0) del self.engine @@ -285,12 +284,12 @@ def _handle_abort_request(self, request: RPCAbortRequest): logger.info("Aborted request %s.", request.request_id) def _heartbeat_loop(self): - while not self._health_check_stop_event.wait( + while not self._heartbeat_stop_event.wait( timeout=HEARTBEAT_INTERVAL_MS): # Loops every `HEARTBEAT_INTERVAL_MS` until the stop event is set self._heartbeat() - logger.debug("Exiting MQLLMEngine health check thread") + logger.debug("Exiting MQLLMEngine heartbeat thread") def _heartbeat(self): # Send unhealthy if engine has already errored From 3fcd3b381fa8df1594ed471ef6c2c42c57627379 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 11:54:17 -0600 Subject: [PATCH 04/10] :bug: set heartbeat interval based on rpc timeout Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/client.py | 8 ++++---- vllm/engine/multiprocessing/engine.py | 9 ++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 775d3160b104f..7e397cf408fba 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -123,7 +123,7 @@ def get_data_socket(self) -> Iterator[Socket]: finally: socket.close(linger=0) - async def run_check_health_loop(self, timeout: int): + async def run_heartbeat_loop(self, timeout: int): """Background loop that continually listens to the RPCServer for heartbeats. """ @@ -141,10 +141,10 @@ async def run_check_health_loop(self, timeout: int): else: # Heartbeat received- check the message await self._check_success( - error_message="Health check failed.", + error_message="Heartbeat failed.", socket=self.heartbeat_socket) - logger.debug("Health probe successful.") + logger.debug("Heartbeat successful.") except asyncio.CancelledError: logger.debug("Shutting down MQLLMEngineClient check health loop.") @@ -227,7 +227,7 @@ async def setup(self): # Start health_loop. self.health_loop = asyncio.create_task( - self.run_check_health_loop(timeout=VLLM_RPC_TIMEOUT)) + self.run_heartbeat_loop(timeout=VLLM_RPC_TIMEOUT)) def close(self): """Destroy the ZeroMQ Context.""" diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index ca7318a98310b..9abb8e82869c6 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -20,6 +20,7 @@ RPCError, RPCProcessRequest, RPCStartupRequest, RPCStartupResponse) # yapf: enable +from vllm.envs import VLLM_RPC_TIMEOUT from vllm.logger import init_logger from vllm.outputs import RequestOutput from vllm.usage.usage_lib import UsageContext @@ -30,7 +31,6 @@ logger = init_logger(__name__) POLLING_TIMEOUT_MS = 10000 -HEARTBEAT_INTERVAL_MS = 1000 HEALTHY_RESPONSE = (pickle.dumps(VLLM_RPC_SUCCESS_STR), ) @@ -99,6 +99,9 @@ def __init__(self, # Heartbeat thread self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop) self._heartbeat_stop_event = threading.Event() + # The heartbeat needs to be faster than what the client will wait for + # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds + self.heartbeat_interval_seconds = VLLM_RPC_TIMEOUT / 3000.0 self._last_alive_time = time.time() @@ -286,8 +289,8 @@ def _handle_abort_request(self, request: RPCAbortRequest): def _heartbeat_loop(self): while not self._heartbeat_stop_event.wait( - timeout=HEARTBEAT_INTERVAL_MS): - # Loops every `HEARTBEAT_INTERVAL_MS` until the stop event is set + timeout=self.heartbeat_interval_seconds): + # Loops until the stop event is set self._heartbeat() logger.debug("Exiting MQLLMEngine heartbeat thread") From 33ffbf8d5999fff1448836aefc9fd850f034522c Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 11:59:48 -0600 Subject: [PATCH 05/10] :bug: turn up heartbeats Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 9abb8e82869c6..308cd99173491 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -101,7 +101,7 @@ def __init__(self, self._heartbeat_stop_event = threading.Event() # The heartbeat needs to be faster than what the client will wait for # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds - self.heartbeat_interval_seconds = VLLM_RPC_TIMEOUT / 3000.0 + self.heartbeat_interval_seconds = VLLM_RPC_TIMEOUT / 5000.0 self._last_alive_time = time.time() From 15db07c328a50eafd0b9af5f19081e9b343b624c Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 16:07:40 -0600 Subject: [PATCH 06/10] :recycle: refactor failed abort test Signed-off-by: Joe Runde --- tests/mq_llm_engine/test_error_handling.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/tests/mq_llm_engine/test_error_handling.py b/tests/mq_llm_engine/test_error_handling.py index 49cfc5aa04c36..76b2f494d5b25 100644 --- a/tests/mq_llm_engine/test_error_handling.py +++ b/tests/mq_llm_engine/test_error_handling.py @@ -153,27 +153,20 @@ async def test_failed_abort(tmp_socket): await client.check_health() # Trigger an abort on the client side. - async def bad_abort_after_2s(): - await asyncio.sleep(2.0) - await client.abort(request_id="foo") + # This request ID does not exist, and will cause the engine to error + await client.abort(request_id="foo") - # Trigger an abort in 2s from now. - abort_task = asyncio.create_task(bad_abort_after_2s()) - - # Exception in abort() will happen during this generation. - # This will kill the engine and should return ENGINE_DEAD_ERROR + # Future generation requests will now fail # with reference to the original KeyError("foo") with pytest.raises(MQEngineDeadError) as execinfo: async for _ in client.generate( inputs="Hello my name is", - sampling_params=SamplingParams(max_tokens=2000), + sampling_params=SamplingParams(max_tokens=10), request_id=uuid.uuid4()): pass assert "KeyError" in repr(execinfo.value) assert client.errored - await abort_task - # This should raise the original error. with pytest.raises(RAISED_ERROR): await client.check_health() From 624c64075d65ed13a0a2c22a04b95a05406a5555 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 16:14:54 -0600 Subject: [PATCH 07/10] :recycle: set last alive threshold based on client timeout Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/engine.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 308cd99173491..5c68d34de9b51 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -104,6 +104,10 @@ def __init__(self, self.heartbeat_interval_seconds = VLLM_RPC_TIMEOUT / 5000.0 self._last_alive_time = time.time() + # The heartbeats can tolerate a long period of the engine chugging + # away at a generation request. + # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds + self.last_alive_threshold = VLLM_RPC_TIMEOUT * 3.0 / 1000.0 @property def dead_error(self) -> BaseException: @@ -301,7 +305,7 @@ def _heartbeat(self): self._send_unhealthy(self._errored_with) # Check for life of the main loop - if time.time() - self._last_alive_time > 30: + if time.time() - self._last_alive_time > self.last_alive_threshold: self._send_unhealthy(RuntimeError("Engine loop has died")) # Raises error if unhealthy. From 4703a3de0b0e54313c7f7b3ca437c26de9ec75ef Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 19 Sep 2024 16:28:24 -0600 Subject: [PATCH 08/10] :bug: Check for closed sockets on heartbeat Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/engine.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 5c68d34de9b51..892e4bbeb8203 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -326,10 +326,14 @@ def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): def _send_healthy(self): """Send HEALTHY message to RPCClient.""" + if self.heartbeat_socket.closed: + return self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False) def _send_unhealthy(self, error: BaseException): """Send UNHEALTHY message to RPCClient.""" + if self.heartbeat_socket.closed: + return error_bytes = pickle.dumps(error) self.heartbeat_socket.send_multipart((error_bytes, ), copy=False) From b135cffb0d961fa7b71bcdfb01c77fea800b3f89 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Fri, 20 Sep 2024 14:33:44 -0600 Subject: [PATCH 09/10] Update vllm/engine/multiprocessing/engine.py Co-authored-by: Nick Hill --- vllm/engine/multiprocessing/engine.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 892e4bbeb8203..e2dd38dd9e37e 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -326,9 +326,8 @@ def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): def _send_healthy(self): """Send HEALTHY message to RPCClient.""" - if self.heartbeat_socket.closed: - return - self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False) + if not self.heartbeat_socket.closed: + self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False) def _send_unhealthy(self, error: BaseException): """Send UNHEALTHY message to RPCClient.""" From 10b646dbc8e27f1c207672deae7cc146579e3eee Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Fri, 20 Sep 2024 14:45:38 -0600 Subject: [PATCH 10/10] :art: fixes from review comments Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/engine.py | 35 +++++++++++++-------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index e2dd38dd9e37e..34a7b90ddf462 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -97,7 +97,8 @@ def __init__(self, self._errored_with: Optional[BaseException] = None # Heartbeat thread - self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop) + self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, + daemon=True) self._heartbeat_stop_event = threading.Event() # The heartbeat needs to be faster than what the client will wait for # The VLLM_RPC_TIMEOUT duration is in ms, and we need one in seconds @@ -213,7 +214,6 @@ def run_engine_loop(self): def engine_step(self) -> List[RequestOutput]: """Engine step wrapper with error handling.""" - self._alive() try: return self.engine.step() except SystemExit: @@ -230,7 +230,6 @@ def handle_new_input(self): """Handle new input from the socket""" try: while self.input_socket.poll(timeout=0) != 0: - self._alive() frames = self.input_socket.recv_multipart(copy=False) request = pickle.loads(frames[0].buffer) @@ -244,7 +243,8 @@ def handle_new_input(self): elif isinstance(request, RPCAbortRequest): self._handle_abort_request(request) else: - raise ValueError(f"Unknown RPCRequest Type: {request}") + raise ValueError("Unknown RPCRequest Type: " + f"{type(request)}") except Exception as e: self._set_errored(e) @@ -305,18 +305,18 @@ def _heartbeat(self): self._send_unhealthy(self._errored_with) # Check for life of the main loop - if time.time() - self._last_alive_time > self.last_alive_threshold: + elif time.time() - self._last_alive_time > self.last_alive_threshold: self._send_unhealthy(RuntimeError("Engine loop has died")) - # Raises error if unhealthy. - try: - self.engine.check_health() - self._send_healthy() - except Exception as e: - # TODO: set errored_with here? - # Does it matter if we're just gonna kill the engine? - # Assume requests will be failing anyway if engine is unhealthy? - self._send_unhealthy(e) + else: + # Otherwise- check health of the engine + # self.engine.check_health() raises on unhealthy + try: + self.engine.check_health() + self._send_healthy() + except Exception as e: + self._set_errored(e) + self._send_unhealthy(e) def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): """Send List of RequestOutput to RPCClient.""" @@ -331,10 +331,9 @@ def _send_healthy(self): def _send_unhealthy(self, error: BaseException): """Send UNHEALTHY message to RPCClient.""" - if self.heartbeat_socket.closed: - return - error_bytes = pickle.dumps(error) - self.heartbeat_socket.send_multipart((error_bytes, ), copy=False) + if not self.heartbeat_socket.closed: + error_bytes = pickle.dumps(error) + self.heartbeat_socket.send_multipart((error_bytes, ), copy=False) def _async_socket_engine_callback(self, request_outputs: REQUEST_OUTPUTS_T):