Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Scheduling optimization 2 #4280

Merged
merged 8 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 229 additions & 4 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def __init__(
self.prev_prompt = False
# Latency of the last prompt step
self.last_prompt_latency = 0.0
self.policy = PolicyFactory.get_policy(policy_name="fcfs")

@property
def lora_enabled(self) -> bool:
Expand Down Expand Up @@ -349,6 +350,7 @@ def _schedule_running(
curr_loras: Optional[Set[int]],
policy: Policy,
enable_chunking: bool = False,
update_num_seqs_token_budget: bool = True,
) -> Tuple[deque, SchedulerRunningOutputs]:
"""Schedule sequence groups that are running.

Expand All @@ -366,6 +368,10 @@ def _schedule_running(
chunked number of tokens are scheduled if
`budget.num_batched_tokens` has not enough capacity to schedule
all tokens.
update_num_seqs_token_budget: Do not update token_budget's num seq
groups. This flag is for optimization. If we know the caller
updates add_num_seqs, setting it to False will skip calling
this method which can improve performance.

Returns:
A tuple of remaining running queue (should be always 0) after
Expand Down Expand Up @@ -395,12 +401,12 @@ def _schedule_running(
# We can have up to 1 running prefill at any given time in running
# queue, which means we can guarantee chunk size is at least 1.
assert num_running_tokens != 0
num_running_seqs = seq_group.get_max_num_running_seqs()

running_queue.popleft()
while not self._can_append_slots(seq_group):
budget.subtract_num_batched_tokens(seq_group.request_id,
num_running_tokens)
num_running_seqs = seq_group.get_max_num_running_seqs()
budget.subtract_num_seqs(seq_group.request_id,
num_running_seqs)
if curr_loras is not None and seq_group.lora_int_id > 0:
Expand Down Expand Up @@ -439,7 +445,9 @@ def _schedule_running(
token_chunk_size=1))
budget.add_num_batched_tokens(seq_group.request_id,
num_running_tokens)
budget.add_num_seqs(seq_group.request_id, num_running_seqs)
if update_num_seqs_token_budget:
num_running_seqs = seq_group.get_max_num_running_seqs()
budget.add_num_seqs(seq_group.request_id, num_running_seqs)
if curr_loras is not None and seq_group.lora_int_id > 0:
curr_loras.add(seq_group.lora_int_id)

Expand Down Expand Up @@ -718,7 +726,8 @@ def _schedule_default(self) -> SchedulerOutputs:
budget,
curr_loras,
fcfs_policy,
enable_chunking=False)
enable_chunking=False,
update_num_seqs_token_budget=False)

# If any sequence group is preempted, do not swap in any sequence
# group. because it means there's no slot for new running requests.
Expand Down Expand Up @@ -849,12 +858,228 @@ def _schedule_chunked_prefill(self):
num_lookahead_slots=running_scheduled.num_lookahead_slots,
)

def _schedule_before_regression(self) -> SchedulerOutputs:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this func

# Blocks that need to be swapped or copied before model execution.
blocks_to_swap_in: Dict[int, int] = {}
blocks_to_swap_out: Dict[int, int] = {}
blocks_to_copy: Dict[int, List[int]] = {}

# Fix the current time.
now = time.time()

# Join waiting sequences if possible.
if not self.swapped:
ignored_seq_groups: List[SequenceGroup] = []
scheduled: List[SequenceGroup] = []
# The total number of sequences on the fly, including the
# requests in the generation phase.
num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
for seq_group in self.running)
curr_loras = set(
seq_group.lora_int_id
for seq_group in self.running) if self.lora_enabled else None

# Optimization: We do not sort the waiting queue since the preempted
# sequence groups are added to the front and the new sequence groups
# are added to the back.
leftover_waiting_sequences = deque()
num_batched_tokens = 0
while self._passed_delay(now) and self.waiting:
seq_group = self.waiting[0]
waiting_seqs = seq_group.get_seqs(
status=SequenceStatus.WAITING)
assert len(waiting_seqs) == 1, (
"Waiting sequence group should have only one prompt "
"sequence.")
# get_len includes output tokens if the request has been
# preempted.
num_prefill_tokens = waiting_seqs[0].get_len()
if num_prefill_tokens > self.prompt_limit:
logger.warning(
f"Input prompt ({num_prefill_tokens} tokens) is too "
f"long and exceeds limit of {self.prompt_limit}")
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
self.waiting.popleft()
continue

# If the sequence group cannot be allocated, stop.
can_allocate = self.block_manager.can_allocate(seq_group)
if can_allocate == AllocStatus.LATER:
break
elif can_allocate == AllocStatus.NEVER:
logger.warning(
f"Input prompt ({num_prefill_tokens} tokens) is too "
f"long and exceeds the capacity of block_manager")
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
self.waiting.popleft()
continue

lora_int_id = 0
if self.lora_enabled:
lora_int_id = seq_group.lora_int_id
if (lora_int_id > 0 and lora_int_id not in curr_loras
and len(curr_loras) >= self.lora_config.max_loras):
# We don't have a space for another LoRA, so
# we ignore this request for now.
leftover_waiting_sequences.appendleft(seq_group)
self.waiting.popleft()
continue

# If the number of batched tokens exceeds the limit, stop.
num_batched_tokens += num_prefill_tokens
if (num_batched_tokens >
self.scheduler_config.max_num_batched_tokens):
break

# The total number of sequences in the RUNNING state should not
# exceed the maximum number of sequences.
num_new_seqs = seq_group.get_max_num_running_seqs()
if (num_curr_seqs + num_new_seqs >
self.scheduler_config.max_num_seqs):
break

if lora_int_id > 0:
curr_loras.add(lora_int_id)
self.waiting.popleft()
self._allocate_and_set_running(seq_group)
self.running.append(seq_group)
num_curr_seqs += num_new_seqs
scheduled.append(
ScheduledSequenceGroup(
seq_group=seq_group,
token_chunk_size=num_prefill_tokens))
self.waiting.extendleft(leftover_waiting_sequences)

if scheduled or ignored_seq_groups:
self.prev_prompt = True
scheduler_outputs = SchedulerOutputs(
scheduled_seq_groups=scheduled,
num_prefill_groups=len(scheduled),
num_batched_tokens=num_batched_tokens,
blocks_to_swap_in=blocks_to_swap_in,
blocks_to_swap_out=blocks_to_swap_out,
blocks_to_copy=blocks_to_copy,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=True),
)
return scheduler_outputs

# NOTE(woosuk): Preemption happens only when there is no available slot
# to keep all the sequence groups in the RUNNING state.
# In this case, the policy is responsible for deciding which sequence
# groups to preempt.
self.running = self.policy.sort_by_priority(now, self.running)

# Reserve new token slots for the running sequence groups.
running: Deque[SequenceGroup] = deque()
preempted: List[SequenceGroup] = []
while self.running:
seq_group = self.running.popleft()
while not self._can_append_slots(seq_group):
if self.running:
# Preempt the lowest-priority sequence groups.
victim_seq_group = self.running.pop()
self._preempt(victim_seq_group, blocks_to_swap_out)
preempted.append(victim_seq_group)
else:
# No other sequence groups can be preempted.
# Preempt the current sequence group.
self._preempt(seq_group, blocks_to_swap_out)
preempted.append(seq_group)
break
else:
# Append new slots to the sequence group.
self._append_slots(seq_group, blocks_to_copy)
running.append(seq_group)
self.running = running

# Swap in the sequence groups in the SWAPPED state if possible.
self.swapped = self.policy.sort_by_priority(now, self.swapped)
if not preempted:
num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
for seq_group in self.running)
curr_loras = set(
seq_group.lora_int_id
for seq_group in self.running) if self.lora_enabled else None

leftover_swapped = deque()

while self.swapped:
seq_group = self.swapped[0]
lora_int_id = 0
if self.lora_enabled:
lora_int_id = seq_group.lora_int_id
if (lora_int_id > 0 and lora_int_id not in curr_loras
and len(curr_loras) >= self.lora_config.max_loras):
# We don't have a space for another LoRA, so
# we ignore this request for now.
leftover_swapped.appendleft(seq_group)
self.swapped.popleft()
continue

# If the sequence group cannot be swapped in, stop.
if not self._can_swap_in(seq_group):
break

# The total number of sequences in the RUNNING state should not
# exceed the maximum number of sequences.
num_new_seqs = seq_group.get_max_num_running_seqs()
if (num_curr_seqs + num_new_seqs >
self.scheduler_config.max_num_seqs):
break

if lora_int_id > 0:
curr_loras.add(lora_int_id)
self.swapped.popleft()
self._swap_in(seq_group, blocks_to_swap_in)
self._append_slots(seq_group, blocks_to_copy)
num_curr_seqs += num_new_seqs
self.running.append(seq_group)

self.swapped.extendleft(leftover_swapped)

# Each sequence in the generation phase only takes one token slot.
# Therefore, the number of batched tokens is equal to the number of
# sequences in the RUNNING state.
num_batched_tokens = sum(
seq_group.num_seqs(status=SequenceStatus.RUNNING)
for seq_group in self.running)

scheduler_outputs = SchedulerOutputs(
scheduled_seq_groups=[
ScheduledSequenceGroup(seq_group=running_group,
token_chunk_size=1)
for running_group in self.running
],
num_prefill_groups=0,
num_batched_tokens=num_batched_tokens,
blocks_to_swap_in=blocks_to_swap_in,
blocks_to_swap_out=blocks_to_swap_out,
blocks_to_copy=blocks_to_copy,
ignored_seq_groups=[],
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=False),
)
return scheduler_outputs

def _schedule(self) -> SchedulerOutputs:
"""Schedule queued requests."""
if self.scheduler_config.chunked_prefill_enabled:
return self._schedule_chunked_prefill()
# s = time.time()
result = self._schedule_chunked_prefill()
# print(f"scheduler iter takes {(time.time() - s) * 1000} ms")
return result
else:
# s = time.time()
return self._schedule_default()
# return self._schedule_before_regression()
# print(f"scheduler iter takes {(time.time() - s) * 1000} ms")
# return result

def _can_append_slots(self, seq_group: SequenceGroup) -> bool:
"""Determine whether or not we have enough space in the KV cache to
Expand Down
5 changes: 5 additions & 0 deletions vllm/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,11 @@ def get_num_uncomputed_tokens(self) -> int:
return num_uncomputed_tokens

def num_seqs(self, status: Optional[SequenceStatus] = None) -> int:
# Optimization. We don't need to call get_seqs if we don't need to
# filter by states.
if status is None:
return len(self.seqs_dict)

return len(self.get_seqs(status))

def num_unfinished_seqs(self) -> int:
Expand Down
Loading