From 31c9e5bfac0b2c63b7239b30e893b86724cd4f12 Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 02:20:14 -0700 Subject: [PATCH 1/7] ip --- vllm/core/scheduler.py | 218 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 214 insertions(+), 4 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 4198550621030..efbfa992139b7 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -285,6 +285,7 @@ def __init__( self.prev_prompt = False # Latency of the last prompt step self.last_prompt_latency = 0.0 + self.policy = PolicyFactory.get_policy(policy_name="fcfs") @property def lora_enabled(self) -> bool: @@ -659,7 +660,7 @@ def _schedule_prefills( if curr_loras is not None and lora_int_id > 0: curr_loras.add(lora_int_id) waiting_queue.popleft() - self._allocate_and_set_running(seq_group, num_new_tokens) + self._allocate_and_set_running(seq_group) seq_groups.append( ScheduledSequenceGroup(seq_group=seq_group, token_chunk_size=num_new_tokens)) @@ -851,12 +852,222 @@ def _schedule_chunked_prefill(self): num_lookahead_slots=running_scheduled.num_lookahead_slots, ) + def _schedule_before_regression(self) -> SchedulerOutputs: + # Blocks that need to be swapped or copied before model execution. + blocks_to_swap_in: Dict[int, int] = {} + blocks_to_swap_out: Dict[int, int] = {} + blocks_to_copy: Dict[int, List[int]] = {} + + # Fix the current time. + now = time.time() + + # Join waiting sequences if possible. + if not self.swapped: + ignored_seq_groups: List[SequenceGroup] = [] + scheduled: List[SequenceGroup] = [] + # The total number of sequences on the fly, including the + # requests in the generation phase. + num_curr_seqs = sum(seq_group.get_max_num_running_seqs() + for seq_group in self.running) + curr_loras = set( + seq_group.lora_int_id + for seq_group in self.running) if self.lora_enabled else None + + # Optimization: We do not sort the waiting queue since the preempted + # sequence groups are added to the front and the new sequence groups + # are added to the back. + leftover_waiting_sequences = deque() + num_batched_tokens = 0 + while self._passed_delay(now) and self.waiting: + seq_group = self.waiting[0] + waiting_seqs = seq_group.get_seqs( + status=SequenceStatus.WAITING) + assert len(waiting_seqs) == 1, ( + "Waiting sequence group should have only one prompt " + "sequence.") + # get_len includes output tokens if the request has been + # preempted. + num_prefill_tokens = waiting_seqs[0].get_len() + if num_prefill_tokens > self.prompt_limit: + logger.warning( + f"Input prompt ({num_prefill_tokens} tokens) is too " + f"long and exceeds limit of {self.prompt_limit}") + for seq in waiting_seqs: + seq.status = SequenceStatus.FINISHED_IGNORED + ignored_seq_groups.append(seq_group) + self.waiting.popleft() + continue + + # If the sequence group cannot be allocated, stop. + can_allocate = self.block_manager.can_allocate(seq_group) + if can_allocate == AllocStatus.LATER: + break + elif can_allocate == AllocStatus.NEVER: + logger.warning( + f"Input prompt ({num_prefill_tokens} tokens) is too " + f"long and exceeds the capacity of block_manager") + for seq in waiting_seqs: + seq.status = SequenceStatus.FINISHED_IGNORED + ignored_seq_groups.append(seq_group) + self.waiting.popleft() + continue + + lora_int_id = 0 + if self.lora_enabled: + lora_int_id = seq_group.lora_int_id + if (lora_int_id > 0 and lora_int_id not in curr_loras + and len(curr_loras) >= self.lora_config.max_loras): + # We don't have a space for another LoRA, so + # we ignore this request for now. + leftover_waiting_sequences.appendleft(seq_group) + self.waiting.popleft() + continue + + # If the number of batched tokens exceeds the limit, stop. + num_batched_tokens += num_prefill_tokens + if (num_batched_tokens > + self.scheduler_config.max_num_batched_tokens): + break + + # The total number of sequences in the RUNNING state should not + # exceed the maximum number of sequences. + num_new_seqs = seq_group.get_max_num_running_seqs() + if (num_curr_seqs + num_new_seqs > + self.scheduler_config.max_num_seqs): + break + + if lora_int_id > 0: + curr_loras.add(lora_int_id) + self.waiting.popleft() + self._allocate_and_set_running(seq_group) + self.running.append(seq_group) + num_curr_seqs += num_new_seqs + scheduled.append( + ScheduledSequenceGroup( + seq_group=seq_group, + token_chunk_size=num_prefill_tokens)) + self.waiting.extendleft(leftover_waiting_sequences) + + if scheduled or ignored_seq_groups: + self.prev_prompt = True + scheduler_outputs = SchedulerOutputs( + scheduled_seq_groups=scheduled, + num_prefill_groups=len(scheduled), + num_batched_tokens=num_batched_tokens, + blocks_to_swap_in=blocks_to_swap_in, + blocks_to_swap_out=blocks_to_swap_out, + blocks_to_copy=blocks_to_copy, + ignored_seq_groups=ignored_seq_groups, + num_lookahead_slots=self._get_num_lookahead_slots( + is_prefill=True), + ) + return scheduler_outputs + + # NOTE(woosuk): Preemption happens only when there is no available slot + # to keep all the sequence groups in the RUNNING state. + # In this case, the policy is responsible for deciding which sequence + # groups to preempt. + self.running = self.policy.sort_by_priority(now, self.running) + + # Reserve new token slots for the running sequence groups. + running: Deque[SequenceGroup] = deque() + preempted: List[SequenceGroup] = [] + while self.running: + seq_group = self.running.popleft() + while not self._can_append_slots(seq_group): + if self.running: + # Preempt the lowest-priority sequence groups. + victim_seq_group = self.running.pop() + self._preempt(victim_seq_group, blocks_to_swap_out) + preempted.append(victim_seq_group) + else: + # No other sequence groups can be preempted. + # Preempt the current sequence group. + self._preempt(seq_group, blocks_to_swap_out) + preempted.append(seq_group) + break + else: + # Append new slots to the sequence group. + self._append_slots(seq_group, blocks_to_copy) + running.append(seq_group) + self.running = running + + # Swap in the sequence groups in the SWAPPED state if possible. + self.swapped = self.policy.sort_by_priority(now, self.swapped) + if not preempted: + num_curr_seqs = sum(seq_group.get_max_num_running_seqs() + for seq_group in self.running) + curr_loras = set( + seq_group.lora_int_id + for seq_group in self.running) if self.lora_enabled else None + + leftover_swapped = deque() + + while self.swapped: + seq_group = self.swapped[0] + lora_int_id = 0 + if self.lora_enabled: + lora_int_id = seq_group.lora_int_id + if (lora_int_id > 0 and lora_int_id not in curr_loras + and len(curr_loras) >= self.lora_config.max_loras): + # We don't have a space for another LoRA, so + # we ignore this request for now. + leftover_swapped.appendleft(seq_group) + self.swapped.popleft() + continue + + # If the sequence group cannot be swapped in, stop. + if not self._can_swap_in(seq_group): + break + + # The total number of sequences in the RUNNING state should not + # exceed the maximum number of sequences. + num_new_seqs = seq_group.get_max_num_running_seqs() + if (num_curr_seqs + num_new_seqs > + self.scheduler_config.max_num_seqs): + break + + if lora_int_id > 0: + curr_loras.add(lora_int_id) + self.swapped.popleft() + self._swap_in(seq_group, blocks_to_swap_in) + self._append_slots(seq_group, blocks_to_copy) + num_curr_seqs += num_new_seqs + self.running.append(seq_group) + + self.swapped.extendleft(leftover_swapped) + + # Each sequence in the generation phase only takes one token slot. + # Therefore, the number of batched tokens is equal to the number of + # sequences in the RUNNING state. + num_batched_tokens = sum( + seq_group.num_seqs(status=SequenceStatus.RUNNING) + for seq_group in self.running) + + scheduler_outputs = SchedulerOutputs( + scheduled_seq_groups=[ + ScheduledSequenceGroup(seq_group=running_group, + token_chunk_size=1) + for running_group in self.running + ], + num_prefill_groups=0, + num_batched_tokens=num_batched_tokens, + blocks_to_swap_in=blocks_to_swap_in, + blocks_to_swap_out=blocks_to_swap_out, + blocks_to_copy=blocks_to_copy, + ignored_seq_groups=[], + num_lookahead_slots=self._get_num_lookahead_slots( + is_prefill=False), + ) + return scheduler_outputs + def _schedule(self) -> SchedulerOutputs: """Schedule queued requests.""" if self.scheduler_config.chunked_prefill_enabled: return self._schedule_chunked_prefill() else: - return self._schedule_default() + # return self._schedule_default() + return self._schedule_before_regression() def _can_append_slots(self, seq_group: SequenceGroup) -> bool: """Determine whether or not we have enough space in the KV cache to @@ -952,8 +1163,7 @@ def free_finished_seq_groups(self) -> None: self.running = deque(seq_group for seq_group in self.running if not seq_group.is_finished()) - def _allocate_and_set_running(self, seq_group: SequenceGroup, - num_new_tokens: int) -> None: + def _allocate_and_set_running(self, seq_group: SequenceGroup) -> None: self.block_manager.allocate(seq_group) for seq in seq_group.get_seqs(status=SequenceStatus.WAITING): seq.status = SequenceStatus.RUNNING From ac78b7791b854b6cf603f58e1d50c32d2569463a Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 03:52:46 -0700 Subject: [PATCH 2/7] fix one issue --- vllm/core/scheduler.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index efbfa992139b7..7d98645e02e13 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -298,7 +298,6 @@ def num_decoding_tokens_per_seq(self) -> int: def add_seq_group(self, seq_group: SequenceGroup) -> None: # Add sequence groups to the waiting queue. - logger.debug(f"add_seq_group {seq_group.request_id}") self.waiting.append(seq_group) def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None: @@ -428,7 +427,7 @@ def _schedule_running( swapped_out.append(seq_group) break else: - logger.debug(f"append slot for {seq_group}") + # logger.debug(f"append slot for {seq_group}") self._append_slots(seq_group, blocks_to_copy) is_prefill = seq_group.is_prefill() if is_prefill: @@ -1064,10 +1063,16 @@ def _schedule_before_regression(self) -> SchedulerOutputs: def _schedule(self) -> SchedulerOutputs: """Schedule queued requests.""" if self.scheduler_config.chunked_prefill_enabled: - return self._schedule_chunked_prefill() + s = time.time() + result = self._schedule_chunked_prefill() + print(f"scheduler iter takes {(time.time() - s) * 1000} ms") + return result else: - # return self._schedule_default() - return self._schedule_before_regression() + s = time.time() + result = self._schedule_default() + # result = self._schedule_before_regression() + print(f"scheduler iter takes {(time.time() - s) * 1000} ms") + return result def _can_append_slots(self, seq_group: SequenceGroup) -> bool: """Determine whether or not we have enough space in the KV cache to From cc1b303c4f9a3b31578e4ee99f4850828141e601 Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 03:53:02 -0700 Subject: [PATCH 3/7] , --- vllm/core/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 7d98645e02e13..b677d4ac133d6 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -1063,15 +1063,15 @@ def _schedule_before_regression(self) -> SchedulerOutputs: def _schedule(self) -> SchedulerOutputs: """Schedule queued requests.""" if self.scheduler_config.chunked_prefill_enabled: - s = time.time() + # s = time.time() result = self._schedule_chunked_prefill() - print(f"scheduler iter takes {(time.time() - s) * 1000} ms") + # print(f"scheduler iter takes {(time.time() - s) * 1000} ms") return result else: - s = time.time() + # s = time.time() result = self._schedule_default() # result = self._schedule_before_regression() - print(f"scheduler iter takes {(time.time() - s) * 1000} ms") + # print(f"scheduler iter takes {(time.time() - s) * 1000} ms") return result def _can_append_slots(self, seq_group: SequenceGroup) -> bool: From d74115745847f82b640e2681a3215fa00384ed9c Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 07:26:33 -0700 Subject: [PATCH 4/7] done --- vllm/core/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index b677d4ac133d6..79a99f42ce847 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -427,7 +427,6 @@ def _schedule_running( swapped_out.append(seq_group) break else: - # logger.debug(f"append slot for {seq_group}") self._append_slots(seq_group, blocks_to_copy) is_prefill = seq_group.is_prefill() if is_prefill: From ca079da7c3e3a3f8100bb06e6083f3af0ffec881 Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 17:54:10 -0700 Subject: [PATCH 5/7] done --- vllm/core/scheduler.py | 20 ++++++++++++++------ vllm/sequence.py | 5 +++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 79a99f42ce847..3df28cab46938 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -350,6 +350,7 @@ def _schedule_running( curr_loras: Optional[Set[int]], policy: Policy, enable_chunking: bool = False, + update_num_seqs_token_budget: bool = True, ) -> Tuple[deque, SchedulerRunningOutputs]: """Schedule sequence groups that are running. @@ -367,6 +368,10 @@ def _schedule_running( chunked number of tokens are scheduled if `budget.num_batched_tokens` has not enough capacity to schedule all tokens. + update_num_seqs_token_budget: Do not update token_budget's num seq + groups. This flag is for optimization. If we know the caller + updates add_num_seqs, setting it to False will skip calling + this method which can improve performance. Returns: A tuple of remaining running queue (should be always 0) after @@ -396,12 +401,12 @@ def _schedule_running( # We can have up to 1 running prefill at any given time in running # queue, which means we can guarantee chunk size is at least 1. assert num_running_tokens != 0 - num_running_seqs = seq_group.get_max_num_running_seqs() running_queue.popleft() while not self._can_append_slots(seq_group): budget.subtract_num_batched_tokens(seq_group.request_id, num_running_tokens) + num_running_seqs = seq_group.get_max_num_running_seqs() budget.subtract_num_seqs(seq_group.request_id, num_running_seqs) if curr_loras is not None and seq_group.lora_int_id > 0: @@ -440,7 +445,9 @@ def _schedule_running( token_chunk_size=1)) budget.add_num_batched_tokens(seq_group.request_id, num_running_tokens) - budget.add_num_seqs(seq_group.request_id, num_running_seqs) + if update_num_seqs_token_budget: + num_running_seqs = seq_group.get_max_num_running_seqs() + budget.add_num_seqs(seq_group.request_id, num_running_seqs) if curr_loras is not None and seq_group.lora_int_id > 0: curr_loras.add(seq_group.lora_int_id) @@ -719,7 +726,8 @@ def _schedule_default(self) -> SchedulerOutputs: budget, curr_loras, fcfs_policy, - enable_chunking=False) + enable_chunking=False, + update_num_seqs_token_budget=False) # If any sequence group is preempted, do not swap in any sequence # group. because it means there's no slot for new running requests. @@ -1068,10 +1076,10 @@ def _schedule(self) -> SchedulerOutputs: return result else: # s = time.time() - result = self._schedule_default() - # result = self._schedule_before_regression() + return self._schedule_default() + # return self._schedule_before_regression() # print(f"scheduler iter takes {(time.time() - s) * 1000} ms") - return result + # return result def _can_append_slots(self, seq_group: SequenceGroup) -> bool: """Determine whether or not we have enough space in the KV cache to diff --git a/vllm/sequence.py b/vllm/sequence.py index 7dcacab6f2ab6..b296b37a84f15 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -508,6 +508,11 @@ def get_num_uncomputed_tokens(self) -> int: return num_uncomputed_tokens def num_seqs(self, status: Optional[SequenceStatus] = None) -> int: + # Optimization. We don't need to call get_seqs if we don't need to + # filter by states. + if status is None: + return len(self.seqs_dict) + return len(self.get_seqs(status)) def num_unfinished_seqs(self) -> int: From eac3aa8556606f68fe1e73c91de1111523f8238f Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 19:14:35 -0700 Subject: [PATCH 6/7] done --- vllm/core/scheduler.py | 233 ++--------------------------------------- 1 file changed, 7 insertions(+), 226 deletions(-) diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 3df28cab46938..8b36392b81748 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -285,7 +285,6 @@ def __init__( self.prev_prompt = False # Latency of the last prompt step self.last_prompt_latency = 0.0 - self.policy = PolicyFactory.get_policy(policy_name="fcfs") @property def lora_enabled(self) -> bool: @@ -350,7 +349,6 @@ def _schedule_running( curr_loras: Optional[Set[int]], policy: Policy, enable_chunking: bool = False, - update_num_seqs_token_budget: bool = True, ) -> Tuple[deque, SchedulerRunningOutputs]: """Schedule sequence groups that are running. @@ -368,10 +366,6 @@ def _schedule_running( chunked number of tokens are scheduled if `budget.num_batched_tokens` has not enough capacity to schedule all tokens. - update_num_seqs_token_budget: Do not update token_budget's num seq - groups. This flag is for optimization. If we know the caller - updates add_num_seqs, setting it to False will skip calling - this method which can improve performance. Returns: A tuple of remaining running queue (should be always 0) after @@ -445,7 +439,11 @@ def _schedule_running( token_chunk_size=1)) budget.add_num_batched_tokens(seq_group.request_id, num_running_tokens) - if update_num_seqs_token_budget: + # OPTIMIZATION: Note that get_max_num_running_seqs is + # expensive. For the default scheduling chase where + # enable_chunking is False, num_seqs are updated before running + # this method, so we don't have to update it again here. + if not enable_chunking: num_running_seqs = seq_group.get_max_num_running_seqs() budget.add_num_seqs(seq_group.request_id, num_running_seqs) if curr_loras is not None and seq_group.lora_int_id > 0: @@ -726,8 +724,7 @@ def _schedule_default(self) -> SchedulerOutputs: budget, curr_loras, fcfs_policy, - enable_chunking=False, - update_num_seqs_token_budget=False) + enable_chunking=False) # If any sequence group is preempted, do not swap in any sequence # group. because it means there's no slot for new running requests. @@ -858,228 +855,12 @@ def _schedule_chunked_prefill(self): num_lookahead_slots=running_scheduled.num_lookahead_slots, ) - def _schedule_before_regression(self) -> SchedulerOutputs: - # Blocks that need to be swapped or copied before model execution. - blocks_to_swap_in: Dict[int, int] = {} - blocks_to_swap_out: Dict[int, int] = {} - blocks_to_copy: Dict[int, List[int]] = {} - - # Fix the current time. - now = time.time() - - # Join waiting sequences if possible. - if not self.swapped: - ignored_seq_groups: List[SequenceGroup] = [] - scheduled: List[SequenceGroup] = [] - # The total number of sequences on the fly, including the - # requests in the generation phase. - num_curr_seqs = sum(seq_group.get_max_num_running_seqs() - for seq_group in self.running) - curr_loras = set( - seq_group.lora_int_id - for seq_group in self.running) if self.lora_enabled else None - - # Optimization: We do not sort the waiting queue since the preempted - # sequence groups are added to the front and the new sequence groups - # are added to the back. - leftover_waiting_sequences = deque() - num_batched_tokens = 0 - while self._passed_delay(now) and self.waiting: - seq_group = self.waiting[0] - waiting_seqs = seq_group.get_seqs( - status=SequenceStatus.WAITING) - assert len(waiting_seqs) == 1, ( - "Waiting sequence group should have only one prompt " - "sequence.") - # get_len includes output tokens if the request has been - # preempted. - num_prefill_tokens = waiting_seqs[0].get_len() - if num_prefill_tokens > self.prompt_limit: - logger.warning( - f"Input prompt ({num_prefill_tokens} tokens) is too " - f"long and exceeds limit of {self.prompt_limit}") - for seq in waiting_seqs: - seq.status = SequenceStatus.FINISHED_IGNORED - ignored_seq_groups.append(seq_group) - self.waiting.popleft() - continue - - # If the sequence group cannot be allocated, stop. - can_allocate = self.block_manager.can_allocate(seq_group) - if can_allocate == AllocStatus.LATER: - break - elif can_allocate == AllocStatus.NEVER: - logger.warning( - f"Input prompt ({num_prefill_tokens} tokens) is too " - f"long and exceeds the capacity of block_manager") - for seq in waiting_seqs: - seq.status = SequenceStatus.FINISHED_IGNORED - ignored_seq_groups.append(seq_group) - self.waiting.popleft() - continue - - lora_int_id = 0 - if self.lora_enabled: - lora_int_id = seq_group.lora_int_id - if (lora_int_id > 0 and lora_int_id not in curr_loras - and len(curr_loras) >= self.lora_config.max_loras): - # We don't have a space for another LoRA, so - # we ignore this request for now. - leftover_waiting_sequences.appendleft(seq_group) - self.waiting.popleft() - continue - - # If the number of batched tokens exceeds the limit, stop. - num_batched_tokens += num_prefill_tokens - if (num_batched_tokens > - self.scheduler_config.max_num_batched_tokens): - break - - # The total number of sequences in the RUNNING state should not - # exceed the maximum number of sequences. - num_new_seqs = seq_group.get_max_num_running_seqs() - if (num_curr_seqs + num_new_seqs > - self.scheduler_config.max_num_seqs): - break - - if lora_int_id > 0: - curr_loras.add(lora_int_id) - self.waiting.popleft() - self._allocate_and_set_running(seq_group) - self.running.append(seq_group) - num_curr_seqs += num_new_seqs - scheduled.append( - ScheduledSequenceGroup( - seq_group=seq_group, - token_chunk_size=num_prefill_tokens)) - self.waiting.extendleft(leftover_waiting_sequences) - - if scheduled or ignored_seq_groups: - self.prev_prompt = True - scheduler_outputs = SchedulerOutputs( - scheduled_seq_groups=scheduled, - num_prefill_groups=len(scheduled), - num_batched_tokens=num_batched_tokens, - blocks_to_swap_in=blocks_to_swap_in, - blocks_to_swap_out=blocks_to_swap_out, - blocks_to_copy=blocks_to_copy, - ignored_seq_groups=ignored_seq_groups, - num_lookahead_slots=self._get_num_lookahead_slots( - is_prefill=True), - ) - return scheduler_outputs - - # NOTE(woosuk): Preemption happens only when there is no available slot - # to keep all the sequence groups in the RUNNING state. - # In this case, the policy is responsible for deciding which sequence - # groups to preempt. - self.running = self.policy.sort_by_priority(now, self.running) - - # Reserve new token slots for the running sequence groups. - running: Deque[SequenceGroup] = deque() - preempted: List[SequenceGroup] = [] - while self.running: - seq_group = self.running.popleft() - while not self._can_append_slots(seq_group): - if self.running: - # Preempt the lowest-priority sequence groups. - victim_seq_group = self.running.pop() - self._preempt(victim_seq_group, blocks_to_swap_out) - preempted.append(victim_seq_group) - else: - # No other sequence groups can be preempted. - # Preempt the current sequence group. - self._preempt(seq_group, blocks_to_swap_out) - preempted.append(seq_group) - break - else: - # Append new slots to the sequence group. - self._append_slots(seq_group, blocks_to_copy) - running.append(seq_group) - self.running = running - - # Swap in the sequence groups in the SWAPPED state if possible. - self.swapped = self.policy.sort_by_priority(now, self.swapped) - if not preempted: - num_curr_seqs = sum(seq_group.get_max_num_running_seqs() - for seq_group in self.running) - curr_loras = set( - seq_group.lora_int_id - for seq_group in self.running) if self.lora_enabled else None - - leftover_swapped = deque() - - while self.swapped: - seq_group = self.swapped[0] - lora_int_id = 0 - if self.lora_enabled: - lora_int_id = seq_group.lora_int_id - if (lora_int_id > 0 and lora_int_id not in curr_loras - and len(curr_loras) >= self.lora_config.max_loras): - # We don't have a space for another LoRA, so - # we ignore this request for now. - leftover_swapped.appendleft(seq_group) - self.swapped.popleft() - continue - - # If the sequence group cannot be swapped in, stop. - if not self._can_swap_in(seq_group): - break - - # The total number of sequences in the RUNNING state should not - # exceed the maximum number of sequences. - num_new_seqs = seq_group.get_max_num_running_seqs() - if (num_curr_seqs + num_new_seqs > - self.scheduler_config.max_num_seqs): - break - - if lora_int_id > 0: - curr_loras.add(lora_int_id) - self.swapped.popleft() - self._swap_in(seq_group, blocks_to_swap_in) - self._append_slots(seq_group, blocks_to_copy) - num_curr_seqs += num_new_seqs - self.running.append(seq_group) - - self.swapped.extendleft(leftover_swapped) - - # Each sequence in the generation phase only takes one token slot. - # Therefore, the number of batched tokens is equal to the number of - # sequences in the RUNNING state. - num_batched_tokens = sum( - seq_group.num_seqs(status=SequenceStatus.RUNNING) - for seq_group in self.running) - - scheduler_outputs = SchedulerOutputs( - scheduled_seq_groups=[ - ScheduledSequenceGroup(seq_group=running_group, - token_chunk_size=1) - for running_group in self.running - ], - num_prefill_groups=0, - num_batched_tokens=num_batched_tokens, - blocks_to_swap_in=blocks_to_swap_in, - blocks_to_swap_out=blocks_to_swap_out, - blocks_to_copy=blocks_to_copy, - ignored_seq_groups=[], - num_lookahead_slots=self._get_num_lookahead_slots( - is_prefill=False), - ) - return scheduler_outputs - def _schedule(self) -> SchedulerOutputs: """Schedule queued requests.""" if self.scheduler_config.chunked_prefill_enabled: - # s = time.time() - result = self._schedule_chunked_prefill() - # print(f"scheduler iter takes {(time.time() - s) * 1000} ms") - return result + return self._schedule_chunked_prefill() else: - # s = time.time() return self._schedule_default() - # return self._schedule_before_regression() - # print(f"scheduler iter takes {(time.time() - s) * 1000} ms") - # return result def _can_append_slots(self, seq_group: SequenceGroup) -> bool: """Determine whether or not we have enough space in the KV cache to From eb6ee7a0429cd5726e2fcb16feb7a1ee7fc0c92e Mon Sep 17 00:00:00 2001 From: sang Date: Mon, 22 Apr 2024 21:45:57 -0700 Subject: [PATCH 7/7] fixed --- tests/core/test_scheduler.py | 3 ++- vllm/core/scheduler.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index a2511238506b0..ab471d206618b 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -563,7 +563,8 @@ def cannot_append_second_group(seq_group, num_lookahead_slots): assert len(output.preempted) == 2 # Verify budgets are updated. assert budget.num_batched_tokens == 1 - assert budget.num_curr_seqs == 1 + # NOTE: When enable_chunk is False, num_seqs budget is not updated. + # assert budget.num_curr_seqs == 1 # Both should be preempted, not swapped. assert output.blocks_to_swap_out == {} # Nothing is copied. diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 8b36392b81748..99f7a34d336a4 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -443,7 +443,7 @@ def _schedule_running( # expensive. For the default scheduling chase where # enable_chunking is False, num_seqs are updated before running # this method, so we don't have to update it again here. - if not enable_chunking: + if enable_chunking: num_running_seqs = seq_group.get_max_num_running_seqs() budget.add_num_seqs(seq_group.request_id, num_running_seqs) if curr_loras is not None and seq_group.lora_int_id > 0: