-
-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[V1][PoC] Refactor EngineCoreOutputs #12853
base: main
Are you sure you want to change the base?
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bnellnm, I added a few more comments...
vllm/v1/core/scheduler.py
Outdated
output.request_ids.append(req_id) | ||
output.new_token_id_offsets.append(offset) | ||
new_ids = request.output_token_ids[-num_new_tokens:] | ||
output.new_token_ids += new_ids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be for another iteration, but I'm thinking he we may want to do this outside of the loop, keep the new token ids as a tensor and either send as-is and do additional filtering in the front-end process, or do the filtering via tensor slicing/index_select type operations.
This would have the benefits of:
- Eliminating intermediate objects creation which scales with the batch size
- Eliminating serialization overhead - I think if we can keep as much as possible of EngineCoreOutputs in tensor/numpy form, we can transmit the backing buffer(s) as-is ... zmq can read directly from these and later we could also see if we can use shm
- Moving some work to the front-end process which we can more easily scale out
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
Signed-off-by: Bill Nell <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bnellnm, I made some more comments inline
if num_requests <= VLLM_V1_OUTPUT_PROC_CHUNK_SIZE: | ||
num_chunks = 1 | ||
chunk_size = num_requests | ||
rem = 0 | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just keep the else
logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got div by zero when I tried just the else
code path, so I left both branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that should only be possible if outputs.request_ids
is empty ... I don't remember if that should ever happen but if it does we would just skip the loop anyhow (unless we need to still update the iteration stats in this case)
for i, req_id in enumerate( | ||
engine_core_outputs.request_ids[first:last]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i, req_id in enumerate( | |
engine_core_outputs.request_ids[first:last]): | |
for req_idx in range(first, last): | |
req_id = engine_core_outputs.request_ids[req_idx] |
self._num_tokens = len(self.all_token_ids) | ||
self._num_output_tokens = len(self.output_token_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was doing some profiling and it showed num_tokens
and num_output_tokens
being fairly expensive/called often so I figured I would cache the lengths.
@@ -34,13 +37,29 @@ def decode(self, obj: Any): | |||
return self.decoder.decode(obj) | |||
|
|||
|
|||
class NumpySerializedRepresentation(msgspec.Struct, gc=False, array_like=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on this part in this branch (hopefully it's in a pretty much complete/working state now): https://github.com/njhill/vllm/tree/tensor-nocopy
idea is for the encoder to collect a list of the buffer references and return it at the end, rather than serializing any of them directly.
num_samples = sampled_token_ids.shape[0] | ||
max_gen_len = sampled_token_ids.shape[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num_samples = sampled_token_ids.shape[0] | |
max_gen_len = sampled_token_ids.shape[1] | |
num_samples, max_gen_len = sampled_token_ids.shape |
for i, sampled_ids in enumerate(sampled_token_ids): | ||
#draft_token_ids: np.ndarray = np.full((num_samples, max_gen_len), INVALID_TOKEN_ID, dtype=int) | ||
|
||
valid_mask = sampled_token_ids != INVALID_TOKEN_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we still want to do the mask and sum on GPU and then that list of lengths could be passed in:
valid_mask = sampled_token_ids != INVALID_TOKEN_ID
gen_lens = valid_mask.sum(dim=1).tolist()
This line below you can change to read directly from the nparray:
self.input_batch.token_ids_cpu[i, start_idx:end_idx] = sampled_ids
i.e.
self.input_batch.token_ids_cpu[i, start_idx:end_idx] = sampled_token_ids[i, :gen_lens[i]]
@@ -1042,8 +1058,11 @@ def generate_draft_token_ids( | |||
if drafter_output is None or len(drafter_output) == 0: | |||
draft_token_ids.append([]) | |||
else: | |||
#assert len(drafter_output) <= max_gen_len | |||
#draft_token_ids[i] = drafter_output | |||
draft_token_ids.append(drafter_output.tolist()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could keep this as a list of ndarrays. Later could even update the drafter to take an ndarray as input and have it write the tokens into it, so we can build a single 1-dim array with offsets.
new_token_id_offsets : List[int] = [] | ||
new_token_id_counts: Optional[List[int]] = None # ndarray? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes keep as array ... and we don't need both offsets and counts right?
new_token_ids: np.ndarray = np.empty(0, dtype=int) # Optional? | ||
|
||
# req_id -> LogprobsLists | ||
new_logprobs: Dict[str, LogprobsLists] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change these to LogprobsTensors
too
Fold
EngineCoreOutput
fields directly intoEngineCoreOutputs
so that we don't need to create so many small objects in the scheduler.