diff --git a/tests/entrypoints/openai/rpc/test_zmq_client.py b/tests/entrypoints/openai/rpc/test_zmq_client.py index 631d15cd03ed7..cafd125c5a598 100644 --- a/tests/entrypoints/openai/rpc/test_zmq_client.py +++ b/tests/entrypoints/openai/rpc/test_zmq_client.py @@ -75,11 +75,12 @@ async def test_client_aborts_use_timeouts(monkeypatch, dummy_server, m.setattr(dummy_server, "abort", lambda x: None) m.setattr(client, "_data_timeout", 10) - # Ensure the client doesn't hang + # The client should suppress timeouts on `abort`s + # and return normally, assuming the server will eventually + # abort the request. client_task = asyncio.get_running_loop().create_task( client.abort("test request id")) - with pytest.raises(TimeoutError, match="Server didn't reply within"): - await asyncio.wait_for(client_task, timeout=0.05) + await asyncio.wait_for(client_task, timeout=0.05) @pytest.mark.asyncio diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 603ac19d8c04b..8e8371ef1559a 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -6,7 +6,7 @@ import re import tempfile from argparse import Namespace -from contextlib import asynccontextmanager, suppress +from contextlib import asynccontextmanager from http import HTTPStatus from typing import AsyncIterator, Optional, Set @@ -83,8 +83,7 @@ async def lifespan(app: FastAPI): async def _force_log(): while True: await asyncio.sleep(10) - with suppress(Exception): - await async_engine_client.do_log_stats() + await async_engine_client.do_log_stats() if not engine_args.disable_log_stats: task = asyncio.create_task(_force_log()) diff --git a/vllm/entrypoints/openai/rpc/client.py b/vllm/entrypoints/openai/rpc/client.py index 55b92d41975ea..dc316ca1160c6 100644 --- a/vllm/entrypoints/openai/rpc/client.py +++ b/vllm/entrypoints/openai/rpc/client.py @@ -335,7 +335,18 @@ async def _is_tracing_enabled_rpc(self) -> bool: async def abort(self, request_id: str): """Send an ABORT_REQUEST signal to the RPC Server""" - with suppress(RPCClientClosedError): + + # Suppress timeouts as well. + # In cases where the server is busy processing requests and a very + # large volume of abort requests arrive, it is likely that the server + # will not be able to ack all of them in time. We have seen this when + # we abort 20k requests at once while another 2k are processing- many + # of them time out, but we see the server successfully abort all of the + # requests. + # In this case we assume that the server has received or will receive + # these abort requests, and ignore the timeout. This prevents a massive + # wall of `TimeoutError` stack traces. + with suppress(RPCClientClosedError, TimeoutError): await self._send_one_way_rpc_request( request=RPCAbortRequest(request_id), error_message=f"RPCAbortRequest {request_id} failed")