Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Benchmark Fix : Fix JSON decode error #142

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 102 additions & 54 deletions neuralmagic/benchmarks/scripts/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import os
import time
import re
from dataclasses import dataclass
from typing import Optional

Expand All @@ -30,7 +31,13 @@ class RequestFuncInput:

@dataclass
class RequestFuncOutput:
generated_text: str = ""
"""
Populate server_response instead of generated_text, if we don't
want to burden the async process with decoding the server_response. This
decoding can instead happen offline.
"""
generated_text: str = None
server_response: bytes = None
success: bool = False
latency: float = 0
ttft: float = 0
Expand Down Expand Up @@ -98,58 +105,99 @@ async def async_request_tgi(
return output


async def async_request_vllm(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith("generate")

async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
payload = {
"prompt": request_func_input.prompt,
"n": 1,
"best_of": request_func_input.best_of,
"use_beam_search": request_func_input.use_beam_search,
# TODO (varun) : Make temperature configurable
#"temperature": 0.0 if request_func_input.use_beam_search else 1.0,
"temperature": 0.0,
"top_p": 1.0,
"max_tokens": request_func_input.output_len,
"ignore_eos": True,
"stream": True,
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len

ttft = 0
st = time.perf_counter()
try:
async with session.post(url=api_url, json=payload) as response:
if response.status == 200:
data = None
async for part_data in response.content.iter_any():
if ttft == 0:
ttft = time.perf_counter() - st
output.ttft = ttft
data = part_data
output.latency = time.perf_counter() - st

# When streaming, '\0' is appended to the end.
body = trim_suffix(data.decode('utf-8'), "\0")
output.generated_text = json.loads(
body)["text"][0][len(request_func_input.prompt):]

output.success = True

else:
output.success = False
except (aiohttp.ClientOSError, aiohttp.ServerDisconnectedError):
output.success = False

if pbar:
pbar.update(1)
return output
class AsyncRequestVLLM:

@staticmethod
def stream_server_outputs():
"""
This function is queried to set the `stream` option of the
server JSON payload.
"""
return True

@staticmethod
def decode_server_response(server_response: bytes, prompt_len: int) -> str:
"""
Decodes the server response and returns the text generated by
the server in response to the prompt.
"""

def try_json_decode(s: str) -> dict:
try:
return json.loads(s)
except Exception as _:
return None

# When streaming, '\0' is appended to the end.
assert (AsyncRequestVLLM.stream_server_outputs())
body: str = trim_suffix(server_response.decode('utf-8'), "\0")

# Most times we only have one JSON in the body.
decoded_json = try_json_decode(body)
if decoded_json is not None:
return decoded_json["text"][0][prompt_len:]

# Some times body contains more than one JSON.
# These JSONs essentially contain the generated text and the
# last of the JSONs has the entire generated text.
json_starts = [m.start() for m in re.finditer('{\"text\":', body)]
for json_start in reversed(json_starts):
decoded_json = try_json_decode(body[json_start:])
if decoded_json is not None:
return decoded_json["text"][0][prompt_len:]

raise ValueError(f"Cannot decode json body \n {body}")

@staticmethod
async def async_request_vllm(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no functional change this function except storing the response directly in the RequestFuncOutput

request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith("generate")

async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
payload = {
"prompt": request_func_input.prompt,
"n": 1,
"best_of": request_func_input.best_of,
"use_beam_search": request_func_input.use_beam_search,
# TODO (varun) : Make temperature configurable
#"temperature": 0.0 if request_func_input.use_beam_search \
# else 1.0,
"temperature": 0.0,
"top_p": 1.0,
"max_tokens": request_func_input.output_len,
"ignore_eos": True,
"stream": AsyncRequestVLLM.stream_server_outputs(),
}
output = RequestFuncOutput()
output.prompt_len = len(request_func_input.prompt)

ttft = 0
st = time.perf_counter()
try:
async with session.post(url=api_url, json=payload) as response:
if response.status == 200:
data = None
async for part_data in response.content.iter_any():
if ttft == 0:
ttft = time.perf_counter() - st
output.ttft = ttft
data = part_data

output.latency = time.perf_counter() - st
output.server_response = data
output.success = True

else:
output.success = False
except (aiohttp.ClientOSError, aiohttp.ServerDisconnectedError):
output.success = False

if pbar:
pbar.update(1)
return output


async def async_request_trt_llm(
Expand Down Expand Up @@ -306,7 +354,7 @@ async def async_request_openai_completions(

ASYNC_REQUEST_FUNCS = {
"tgi": async_request_tgi,
"vllm": async_request_vllm,
"vllm": AsyncRequestVLLM.async_request_vllm,
"deepspeed-mii": async_request_deepspeed_mii,
"openai": async_request_openai_completions,
"tensorrt-llm": async_request_trt_llm,
Expand Down
27 changes: 27 additions & 0 deletions neuralmagic/benchmarks/scripts/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

from neuralmagic.benchmarks.scripts.backend_request_func import (
ASYNC_REQUEST_FUNCS,
AsyncRequestVLLM,
RequestFuncInput,
RequestFuncOutput,
)
Expand Down Expand Up @@ -184,6 +185,31 @@ def calculate_metrics(
return metrics


def decode_generated_text(
backend: str,
outputs: List[RequestFuncOutput]) -> List[RequestFuncOutput]:

if all(
map(
lambda output: not output.success or output.generated_text is
not None, outputs)):
# Nothing to do
return outputs

# At the moment, all backend request functions except async_request_vllm
# report generated_text directly.
assert backend == "vllm"
assert all(
map(
lambda output: not output.success or output.server_response is
not None, outputs))

for output in outputs:
output.generated_text = AsyncRequestVLLM.decode_server_response(
output.server_response, output.prompt_len)
return outputs


async def benchmark(backend: str, api_url: str, model_id: str,
tokenizer: PreTrainedTokenizerBase,
input_requests: List[Tuple[str, int, int]], best_of: int,
Expand Down Expand Up @@ -217,6 +243,7 @@ async def benchmark(backend: str, api_url: str, model_id: str,
request_func(request_func_input=request_func_input,
pbar=pbar)))
outputs = await asyncio.gather(*tasks)
outputs = decode_generated_text(backend, outputs)

if not disable_tqdm:
pbar.close()
Expand Down
5 changes: 3 additions & 2 deletions neuralmagic/benchmarks/scripts/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from vllm.transformers_utils.tokenizer import get_tokenizer
from .datasets_registry import SHAREGPT_PATH, SHAREGPT_DOWNLOAD_STR
from .backend_request_func import (RequestFuncInput, RequestFuncOutput,
async_request_vllm)
AsyncRequestVLLM)
from ...tools.call_cmd import call_cmd


Expand Down Expand Up @@ -169,7 +169,8 @@ async def process_requests(input_requests):
)
tasks.append(
asyncio.create_task(
async_request_vllm(request_func_input=request_func_input)))
AsyncRequestVLLM.async_request_vllm(
request_func_input=request_func_input)))
_ = await asyncio.gather(*tasks)

tokenizer = get_tokenizer(model)
Expand Down
Loading