diff --git a/neuralmagic/benchmarks/scripts/backend_request_func.py b/neuralmagic/benchmarks/scripts/backend_request_func.py index b5e0308848e25..97f643b8df4a5 100644 --- a/neuralmagic/benchmarks/scripts/backend_request_func.py +++ b/neuralmagic/benchmarks/scripts/backend_request_func.py @@ -8,6 +8,7 @@ import json import os import time +import re from dataclasses import dataclass from typing import Optional @@ -30,7 +31,13 @@ class RequestFuncInput: @dataclass class RequestFuncOutput: - generated_text: str = "" + """ + Populate server_response instead of generated_text, if we don't + want to burden the async process with decoding the server_response. This + decoding can instead happen offline. + """ + generated_text: str = None + server_response: bytes = None success: bool = False latency: float = 0 ttft: float = 0 @@ -98,58 +105,99 @@ async def async_request_tgi( return output -async def async_request_vllm( - request_func_input: RequestFuncInput, - pbar: Optional[tqdm] = None, -) -> RequestFuncOutput: - api_url = request_func_input.api_url - assert api_url.endswith("generate") - - async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: - payload = { - "prompt": request_func_input.prompt, - "n": 1, - "best_of": request_func_input.best_of, - "use_beam_search": request_func_input.use_beam_search, - # TODO (varun) : Make temperature configurable - #"temperature": 0.0 if request_func_input.use_beam_search else 1.0, - "temperature": 0.0, - "top_p": 1.0, - "max_tokens": request_func_input.output_len, - "ignore_eos": True, - "stream": True, - } - output = RequestFuncOutput() - output.prompt_len = request_func_input.prompt_len - - ttft = 0 - st = time.perf_counter() - try: - async with session.post(url=api_url, json=payload) as response: - if response.status == 200: - data = None - async for part_data in response.content.iter_any(): - if ttft == 0: - ttft = time.perf_counter() - st - output.ttft = ttft - data = part_data - output.latency = time.perf_counter() - st - - # When streaming, '\0' is appended to the end. - body = trim_suffix(data.decode('utf-8'), "\0") - output.generated_text = json.loads( - body)["text"][0][len(request_func_input.prompt):] - - output.success = True - - else: - output.success = False - except (aiohttp.ClientOSError, aiohttp.ServerDisconnectedError): - output.success = False - - if pbar: - pbar.update(1) - return output +class AsyncRequestVLLM: + + @staticmethod + def stream_server_outputs(): + """ + This function is queried to set the `stream` option of the + server JSON payload. + """ + return True + + @staticmethod + def decode_server_response(server_response: bytes, prompt_len: int) -> str: + """ + Decodes the server response and returns the text generated by + the server in response to the prompt. + """ + + def try_json_decode(s: str) -> dict: + try: + return json.loads(s) + except Exception as _: + return None + + # When streaming, '\0' is appended to the end. + assert (AsyncRequestVLLM.stream_server_outputs()) + body: str = trim_suffix(server_response.decode('utf-8'), "\0") + + # Most times we only have one JSON in the body. + decoded_json = try_json_decode(body) + if decoded_json is not None: + return decoded_json["text"][0][prompt_len:] + + # Some times body contains more than one JSON. + # These JSONs essentially contain the generated text and the + # last of the JSONs has the entire generated text. + json_starts = [m.start() for m in re.finditer('{\"text\":', body)] + for json_start in reversed(json_starts): + decoded_json = try_json_decode(body[json_start:]) + if decoded_json is not None: + return decoded_json["text"][0][prompt_len:] + + raise ValueError(f"Cannot decode json body \n {body}") + + @staticmethod + async def async_request_vllm( + request_func_input: RequestFuncInput, + pbar: Optional[tqdm] = None, + ) -> RequestFuncOutput: + api_url = request_func_input.api_url + assert api_url.endswith("generate") + + async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: + payload = { + "prompt": request_func_input.prompt, + "n": 1, + "best_of": request_func_input.best_of, + "use_beam_search": request_func_input.use_beam_search, + # TODO (varun) : Make temperature configurable + #"temperature": 0.0 if request_func_input.use_beam_search \ + # else 1.0, + "temperature": 0.0, + "top_p": 1.0, + "max_tokens": request_func_input.output_len, + "ignore_eos": True, + "stream": AsyncRequestVLLM.stream_server_outputs(), + } + output = RequestFuncOutput() + output.prompt_len = len(request_func_input.prompt) + + ttft = 0 + st = time.perf_counter() + try: + async with session.post(url=api_url, json=payload) as response: + if response.status == 200: + data = None + async for part_data in response.content.iter_any(): + if ttft == 0: + ttft = time.perf_counter() - st + output.ttft = ttft + data = part_data + + output.latency = time.perf_counter() - st + output.server_response = data + output.success = True + + else: + output.success = False + except (aiohttp.ClientOSError, aiohttp.ServerDisconnectedError): + output.success = False + + if pbar: + pbar.update(1) + return output async def async_request_trt_llm( @@ -306,7 +354,7 @@ async def async_request_openai_completions( ASYNC_REQUEST_FUNCS = { "tgi": async_request_tgi, - "vllm": async_request_vllm, + "vllm": AsyncRequestVLLM.async_request_vllm, "deepspeed-mii": async_request_deepspeed_mii, "openai": async_request_openai_completions, "tensorrt-llm": async_request_trt_llm, diff --git a/neuralmagic/benchmarks/scripts/benchmark_serving.py b/neuralmagic/benchmarks/scripts/benchmark_serving.py index 6dc32e9d552ea..5e5de7ebcba93 100644 --- a/neuralmagic/benchmarks/scripts/benchmark_serving.py +++ b/neuralmagic/benchmarks/scripts/benchmark_serving.py @@ -41,6 +41,7 @@ from neuralmagic.benchmarks.scripts.backend_request_func import ( ASYNC_REQUEST_FUNCS, + AsyncRequestVLLM, RequestFuncInput, RequestFuncOutput, ) @@ -184,6 +185,31 @@ def calculate_metrics( return metrics +def decode_generated_text( + backend: str, + outputs: List[RequestFuncOutput]) -> List[RequestFuncOutput]: + + if all( + map( + lambda output: not output.success or output.generated_text is + not None, outputs)): + # Nothing to do + return outputs + + # At the moment, all backend request functions except async_request_vllm + # report generated_text directly. + assert backend == "vllm" + assert all( + map( + lambda output: not output.success or output.server_response is + not None, outputs)) + + for output in outputs: + output.generated_text = AsyncRequestVLLM.decode_server_response( + output.server_response, output.prompt_len) + return outputs + + async def benchmark(backend: str, api_url: str, model_id: str, tokenizer: PreTrainedTokenizerBase, input_requests: List[Tuple[str, int, int]], best_of: int, @@ -217,6 +243,7 @@ async def benchmark(backend: str, api_url: str, model_id: str, request_func(request_func_input=request_func_input, pbar=pbar))) outputs = await asyncio.gather(*tasks) + outputs = decode_generated_text(backend, outputs) if not disable_tqdm: pbar.close() diff --git a/neuralmagic/benchmarks/scripts/common.py b/neuralmagic/benchmarks/scripts/common.py index 3229344996583..5757592688268 100644 --- a/neuralmagic/benchmarks/scripts/common.py +++ b/neuralmagic/benchmarks/scripts/common.py @@ -13,7 +13,7 @@ from vllm.transformers_utils.tokenizer import get_tokenizer from .datasets_registry import SHAREGPT_PATH, SHAREGPT_DOWNLOAD_STR from .backend_request_func import (RequestFuncInput, RequestFuncOutput, - async_request_vllm) + AsyncRequestVLLM) from ...tools.call_cmd import call_cmd @@ -169,7 +169,8 @@ async def process_requests(input_requests): ) tasks.append( asyncio.create_task( - async_request_vllm(request_func_input=request_func_input))) + AsyncRequestVLLM.async_request_vllm( + request_func_input=request_func_input))) _ = await asyncio.gather(*tasks) tokenizer = get_tokenizer(model)