Skip to content

Commit

Permalink
Refactor EngineCoreOutputs
Browse files Browse the repository at this point in the history
  • Loading branch information
bnellnm committed Feb 6, 2025
1 parent bc1bdec commit 9639b0d
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 62 deletions.
7 changes: 6 additions & 1 deletion benchmarks/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,15 @@ async def async_request_openai_completions(
try:
async with session.post(url=api_url, json=payload,
headers=headers) as response:
#print(f"RES = {response.status}")
if response.status == 200:
first_chunk_received = False

#print(response)

async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
#print(f"CB = {chunk_bytes}")
if not chunk_bytes:
continue

Expand Down Expand Up @@ -313,7 +318,7 @@ async def async_request_openai_completions(
else:
output.success = False
output.error = (
"Never received a valid chunk to calculate TTFT."
"Never received a valid chunk to calculate TTFT. "
"This response will be marked as failed!")
output.generated_text = generated_text
output.latency = most_recent_timestamp - st
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,10 @@ async def benchmark(
)
test_output = await request_func(request_func_input=test_input)
if not test_output.success:
raise ValueError(
#raise ValueError(
# "Initial test run failed - Please make sure benchmark arguments "
# f"are correctly specified. Error: {test_output.error}")
print(
"Initial test run failed - Please make sure benchmark arguments "
f"are correctly specified. Error: {test_output.error}")
else:
Expand Down
42 changes: 26 additions & 16 deletions vllm/v1/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from vllm.v1.core.encoder_cache_manager import (EncoderCacheManager,
compute_encoder_budget)
from vllm.v1.core.kv_cache_manager import KVCacheManager
from vllm.v1.engine import EngineCoreOutput, EngineCoreOutputs
from vllm.v1.engine import EngineCoreOutputs
from vllm.v1.metrics.stats import SchedulerStats
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.request import FinishReason, Request, RequestStatus

Check failure on line 18 in vllm/v1/core/scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F401)

vllm/v1/core/scheduler.py:18:29: F401 `vllm.v1.request.FinishReason` imported but unused
if TYPE_CHECKING:
from vllm.multimodal import MultiModalKwargs
Expand Down Expand Up @@ -413,11 +413,19 @@ def update_from_output(
sampled_token_ids = model_runner_output.sampled_token_ids
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
new_running: List[Request] = []
outputs: List[EngineCoreOutput] = []
output = EngineCoreOutputs(request_ids=[],
new_token_id_offsets=[],
new_token_ids=[],
finished=[],
finish_reason={},
stop_reason=[],
scheduler_stats=None
)

# NOTE(woosuk): As len(self.running) can be up to 1K or more, the below
# loop can be a performance bottleneck. We should do our best to avoid
# expensive operations inside the loop.
offset = 0
for request in self.running:
req_id = request.request_id
num_tokens_scheduled = num_scheduled_tokens.get(req_id, 0)
Expand Down Expand Up @@ -455,30 +463,32 @@ def update_from_output(
# TODO: Update the KV cache manager for prefix caching.

# Check for stop and update request state.
# This must be called before we make the EngineCoreOutput.
# This must be called before we make the EngineCoreOutputs.
stopped = self._check_stop(request)
if stopped:
self._free_request(request)

# Add EngineCoreOutput for this Request.
output = EngineCoreOutput(
request_id=req_id,
new_token_ids=request.output_token_ids[-num_new_tokens:],
finished=request.is_finished(),
finish_reason=request.get_finished_reason(),
stop_reason=request.stop_reason)
outputs.append(output)
# not a list of outputs here

# Add EngineCoreOutputs for this Request.
output.request_ids.append(req_id)
output.new_token_id_offsets.append(offset)
output.new_token_ids += request.output_token_ids[-num_new_tokens:]
output.finished.append(request.is_finished())
if request.get_finished_reason() is not None:
output.finish_reason[req_id] = request.get_finished_reason()
#print(f"req stop = {request.stop_reason}, {request.status}")
output.stop_reason.append(request.stop_reason)
offset = offset + 1 # move out of if?

# Breakout of the loop.
if stopped:
continue

new_running.append(request)
self.running = new_running
return EngineCoreOutputs(
outputs=outputs,
scheduler_stats=self.make_stats(),
)
output.scheduler_stats = self.make_stats()
return output

def _check_stop(self, request: Request) -> bool:
if (request.num_tokens >= self.max_model_len
Expand Down
31 changes: 18 additions & 13 deletions vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union

import msgspec

Expand Down Expand Up @@ -59,17 +59,17 @@ class EngineCoreRequest:
lora_request: Optional["LoRARequest"]


class EngineCoreOutput(
msgspec.Struct,
array_like=True, # type: ignore[call-arg]
omit_defaults=True, # type: ignore[call-arg]
gc=False): # type: ignore[call-arg]

request_id: str
new_token_ids: List[int]
finished: bool
finish_reason: Optional[FinishReason] = None
stop_reason: Union[int, str, None] = None
#class EngineCoreOutput(
# msgspec.Struct,
# array_like=True, # type: ignore[call-arg]
# omit_defaults=True, # type: ignore[call-arg]
# gc=False): # type: ignore[call-arg]
#
# request_id: str
# new_token_ids: List[int]
# finished: bool
# finish_reason: Optional[FinishReason] = None
# stop_reason: Union[int, str, None] = None


class EngineCoreOutputs(
Expand All @@ -82,7 +82,12 @@ class EngineCoreOutputs(
# e.g. columnwise layout

# [num_reqs]
outputs: List[EngineCoreOutput]
request_ids: List[str]
new_token_id_offsets: List[int]
new_token_ids: List[int]
finished: List[bool]
finish_reason: Dict[str, FinishReason] # Union[List, Dict]?
stop_reason: List[Union[int, str, None]]
scheduler_stats: SchedulerStats


Expand Down
20 changes: 14 additions & 6 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,27 @@ async def _run_output_handler(self):
# Split outputs into chunks of at most
# VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
# event loop for too long.
num_outputs = len(outputs.outputs)
num_outputs = len(outputs.new_token_id_offsets)

if num_outputs <= VLLM_V1_OUTPUT_PROC_CHUNK_SIZE:
slices = (outputs.outputs, )
slices = ((0, num_outputs), )
else:
slices = np.array_split(
outputs.outputs,
slices = []
parts = np.linspace(
num_outputs,
cdiv(num_outputs, VLLM_V1_OUTPUT_PROC_CHUNK_SIZE))
last = 0
for i in parts:
slices.append((last, i))
last = i
print(f"slices = {slices}")

iteration_stats = None
for i, outputs_slice in enumerate(slices):
for i, slice in enumerate(slices):
slice_start, slice_end = slice
# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs_slice, iteration_stats)
outputs, slice_start, slice_end, iteration_stats)
# NOTE: RequestOutputs are pushed to their queues.
assert not processed_outputs.request_outputs
iteration_stats = processed_outputs.iteration_stats
Expand Down
10 changes: 9 additions & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ def step(self) -> EngineCoreOutputs:

if not self.scheduler.has_unfinished_requests():
return EngineCoreOutputs(
outputs=[], scheduler_stats=self.scheduler.make_stats())
request_ids=[],
new_token_id_offsets=[],
new_token_ids=[],
finished=[],
finish_reason={},
stop_reason=[],
scheduler_stats=self.scheduler.make_stats()
)

scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
Expand Down Expand Up @@ -299,5 +306,6 @@ def process_output_socket(self, output_path: str):
with zmq_socket_ctx(output_path, zmq.constants.PUSH) as socket:
while True:
outputs = self.output_queue.get()
#print(outputs)
encoder.encode_into(outputs, buffer)
socket.send_multipart((buffer, ), copy=False)
10 changes: 4 additions & 6 deletions vllm/v1/engine/detokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from vllm.sampling_params import RequestOutputKind
from vllm.transformers_utils.detokenizer_utils import (
AnyTokenizer, convert_prompt_ids_to_tokens, detokenize_incrementally)
from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest, FinishReason
from vllm.v1.engine import EngineCoreOutputs, EngineCoreRequest, FinishReason

Check failure on line 11 in vllm/v1/engine/detokenizer.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F401)

vllm/v1/engine/detokenizer.py:11:28: F401 `vllm.v1.engine.EngineCoreOutputs` imported but unused

logger = init_logger(__name__)

Expand Down Expand Up @@ -98,18 +98,16 @@ def from_new_request(

def update_from_output(
self,
output: EngineCoreOutput,
new_token_ids: List[int],
finish_reason: Optional[FinishReason],
stop_reason: Union[int, str, None],
) -> Optional[DetokenizerOutput]:
"""
Update RequestState for the request_id by:
1) Detokenize the new token ids incrementally.
2) Update the RequestOutput with the new text.
"""

new_token_ids = output.new_token_ids
finish_reason = output.finish_reason
stop_reason = output.stop_reason

# 1) Detokenize the new token ids incrementally.
# TODO(woosuk): This method becomes very inefficient when the number of
# new_token_ids is more than 1. We need to optimize this.
Expand Down
6 changes: 4 additions & 2 deletions vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,14 @@ def add_request(

def step(self) -> List[RequestOutput]:

# 1) Get EngineCoreOutput from the EngineCore.
# 1) Get EngineCoreOutputs from the EngineCore.
outputs = self.engine_core.get_output()

# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs.outputs)
outputs,
0,
len(outputs.request_ids))

# 3) Abort any reqs that finished due to stop strings.
self.engine_core.abort_requests(processed_outputs.reqs_to_abort)
Expand Down
36 changes: 24 additions & 12 deletions vllm/v1/engine/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from vllm.outputs import RequestOutput
from vllm.transformers_utils.detokenizer_utils import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import BaseTokenizerGroup
from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest
from vllm.v1.engine import EngineCoreOutputs, EngineCoreRequest
from vllm.v1.engine.detokenizer import (DetokenizerOutput,
IncrementalDetokenizer)
from vllm.v1.metrics.stats import IterationStats, RequestStateStats
Expand Down Expand Up @@ -106,59 +106,71 @@ def add_request(

def process_outputs(
self,
engine_core_outputs: List[EngineCoreOutput],
engine_core_outputs: EngineCoreOutputs,
first: int,
last: int,
iteration_stats: Optional[IterationStats] = None,
) -> OutputProcessorOutput:
"""
Process the EngineCoreOutputs:
1) Compute stats for logging
2) Detokenize
3) Create and handle RequestOutput objects:
* If there is a queue (for usage with AsyncLLM),
* If there is a queue (for usage with AsyncLLM),
put the RequestOutput objects into the queue for
handling by the per-request generate() tasks.
* If there is no queue (for usage with LLMEngine),
* If there is no queue (for usage with LLMEngine),
return a list of RequestOutput objects.
****************** NOTE FOR DEVELOPERS ******************
VLLM V1 minimizes the number of python loops over the full
batch to ensure system overheads are minimized. This is the
batch to ensure system overheads are minimized. This is the
only function that should loop over EngineCoreOutputs.
If you need to touch every element of the batch, implement a
method called XXXClass.update_from_output() to be called
within the loop below. For examples, see:
* IterationStats.update_from_output()
* Detokenizer.update_from_output()
TODO(rob): add Protocol makes update_from_output explicit.
**********************************************************
"""

request_outputs: List[RequestOutput] = []
reqs_to_abort: List[str] = []
if not iteration_stats:
iteration_stats = IterationStats(self.log_stats)
for engine_core_output in engine_core_outputs:
req_id = engine_core_output.request_id
for i, req_id in enumerate(engine_core_outputs.request_ids[first:last]):
req_state = self.request_states.get(req_id)
if req_state is None:
# Ignore output for already-aborted request.
continue

num_tokens = last - first # might not be robust
start = engine_core_outputs.new_token_id_offsets[i]
end = engine_core_outputs.new_token_id_offsets[i + 1] if i < num_tokens - 1 else -1
# better way to do this?
new_token_ids = engine_core_outputs.new_token_ids[start:end]

# 1) Compute stats for this iteration.
iteration_stats.update_from_output(engine_core_output,
iteration_stats.update_from_output(num_tokens,
req_state.is_prefilling,
req_state.prompt_len,
req_state.stats)
req_state.is_prefilling = False

# 2) Detokenize the token ids into text.
#print(f"finish = {engine_core_outputs.finish_reason.get(req_id)}")
#print(f"stop = {engine_core_outputs.stop_reason[i + first]}")
detokenizer_output = req_state.detokenizer.update_from_output(
engine_core_output)
new_token_ids,
engine_core_outputs.finish_reason.get(req_id),
engine_core_outputs.stop_reason[i + first],
)

# 3) Create and handle RequestOutput objects.
if detokenizer_output is not None:
Expand All @@ -177,7 +189,7 @@ def process_outputs(
assert detokenizer_output.finish_reason is not None

self.request_states.pop(req_id)
if not engine_core_output.finished:
if not engine_core_outputs.finished[i]:
# If req not finished in EngineCore, but Detokenizer
# detected stop string, abort needed in EngineCore.
reqs_to_abort.append(req_id)
Expand Down
Loading

0 comments on commit 9639b0d

Please sign in to comment.