Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Optimize block_manager_v2 vs block_manager_v1 (to make V2 default) #5602

Merged
merged 48 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
007b32d
Optimize block_manager_v2 so it becomes the default
alexm-redhat Jun 14, 2024
ea94e85
cleanups
alexm-redhat Jun 19, 2024
e21c410
refactor code so that only free() is used
alexm-redhat Jun 20, 2024
b5872d2
prefix_caching: refactor self._blocks to tracked blocks
alexm-redhat Jun 21, 2024
54d76ba
format
alexm-redhat Jun 21, 2024
0aecdb2
cpu bug fix
alexm-redhat Jun 21, 2024
d649055
fixes
alexm-redhat Jun 21, 2024
92550b0
fixes
alexm-redhat Jun 21, 2024
4100268
fix immutable promotion
alexm-redhat Jun 21, 2024
8812380
fix last access bug
alexm-redhat Jun 21, 2024
5d12f4f
format
alexm-redhat Jun 21, 2024
ae3dde4
revert offline_inference.py
alexm-redhat Jun 21, 2024
4f29bff
fixes
alexm-redhat Jun 21, 2024
8f8fb66
Cade review comments for fixing append_token_ids
alexm-redhat Jun 21, 2024
7a52d34
cleanup pass
alexm-redhat Jun 21, 2024
6a2b897
sync
alexm-redhat Jun 21, 2024
73c13be
fix test_block_table.py
alexm-redhat Jun 21, 2024
0bbc049
fix swap_in/swap_out to actually replace the blocks
alexm-redhat Jun 21, 2024
c9c7070
fixes
alexm-redhat Jun 21, 2024
fd802b0
fix block table tests
alexm-redhat Jun 21, 2024
5a48c1e
ping
alexm-redhat Jun 21, 2024
1ffa4bf
tmp towards cow/promo back in block
alexm-redhat Jun 26, 2024
38e8e21
sync
alexm-redhat Jun 27, 2024
994e972
works
alexm-redhat Jun 27, 2024
ad83158
format
alexm-redhat Jun 27, 2024
a594be3
sync
alexm-redhat Jun 27, 2024
e72cd50
review fixes from Cade
alexm-redhat Jun 27, 2024
87d14e2
cleanup
alexm-redhat Jun 27, 2024
80e6ab1
test fixes
alexm-redhat Jun 27, 2024
6dcd304
fix sequence prompt/output token ids to be updated properly with the …
alexm-redhat Jun 28, 2024
6c410d8
fix a block_table bug
alexm-redhat Jun 29, 2024
f97cffa
fix the num_token_ids bug by separating num_token_ids and num_tokens_…
alexm-redhat Jun 29, 2024
b74d834
Refactor back token_ids based on Cade comments.
alexm-redhat Jun 29, 2024
179542b
use tuples for seq_data prompt/output token_ids
alexm-redhat Jun 29, 2024
7c0ce65
sync
alexm-redhat Jun 29, 2024
4dd957e
fix
alexm-redhat Jun 29, 2024
325226f
fix tests
alexm-redhat Jun 29, 2024
29e9683
fix tests
alexm-redhat Jun 30, 2024
c36f353
add Antoni's idea for improving caching of computed block ids by usin…
alexm-redhat Jun 30, 2024
d0b2ef9
Based on Cade comment, refactored the seq last_access and cached comp…
alexm-redhat Jun 30, 2024
bd65468
cleanup
alexm-redhat Jun 30, 2024
3064208
Cade's comments
alexm-redhat Jun 30, 2024
2236d5e
fix test
alexm-redhat Jun 30, 2024
4ea6938
fix fork_seq
alexm-redhat Jun 30, 2024
82b31e8
ping
alexm-redhat Jun 30, 2024
3f1c2a1
ping2
alexm-redhat Jun 30, 2024
2ff442d
Cade's comments
alexm-redhat Jul 1, 2024
3322f8c
more Cade commants
alexm-redhat Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benchmarks/benchmark_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def main(args: argparse.Namespace):
load_format=args.load_format,
distributed_executor_backend=args.distributed_executor_backend,
otlp_traces_endpoint=args.otlp_traces_endpoint,
enable_prefix_caching=args.enable_prefix_caching,
)

sampling_params = SamplingParams(
Expand Down Expand Up @@ -220,6 +221,9 @@ def run_to_completion(profile_dir: Optional[str] = None):
action='store_true',
help='If True, the prefill requests can be chunked based on the '
'max_num_batched_tokens')
parser.add_argument("--enable-prefix-caching",
action='store_true',
help="Enable automatic prefix caching")
parser.add_argument('--use-v2-block-manager', action='store_true')
parser.add_argument(
"--ray-workers-use-nsight",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def generate(
req_sample_output_strs: List[str] = []
for sample in req_output.outputs:
output_str = sample.text
output_ids = sample.token_ids
output_ids = list(sample.token_ids)
req_sample_output_ids.append(prompt_ids + output_ids)
req_sample_output_strs.append(prompt_str + output_str)
outputs.append((req_sample_output_ids, req_sample_output_strs))
Expand Down
5 changes: 3 additions & 2 deletions tests/core/block/test_block_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ def test_cow(block_size: int, sequence_len: int, append_len: int,
block_size) - (sequence_len // block_size)

original_block_table.allocate(token_ids=token_ids, device=Device.GPU)
original_block_ids = original_block_table.physical_block_ids
original_block_ids = original_block_table.physical_block_ids[:]

print("original_block_ids = {}".format(original_block_ids))
forked_block_table = original_block_table.fork()

# Expect no additional allocation (copy on _write_).
Expand Down Expand Up @@ -457,7 +458,7 @@ def test_cow_lookahead_simple(block_size: int, sequence_len: int,

# Allocate lookahead slots.
original_block_table.ensure_num_empty_slots(lookahead_slots)
original_block_ids = original_block_table.physical_block_ids
original_block_ids = original_block_table.physical_block_ids[:]

forked_block_table = original_block_table.fork()

Expand Down
24 changes: 12 additions & 12 deletions tests/core/block/test_cpu_gpu_block_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
def test_allocate_mutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
Expand All @@ -21,14 +21,14 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

cpu_blocks = [
allocator.allocate_mutable(prev_block=None, device=Device.CPU)
allocator.allocate_mutable_block(prev_block=None, device=Device.CPU)
for _ in range(num_cpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

gpu_blocks = [
allocator.allocate_mutable(prev_block=None, device=Device.GPU)
allocator.allocate_mutable_block(prev_block=None, device=Device.GPU)
for _ in range(num_gpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
Expand All @@ -47,8 +47,8 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [2])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_immutable(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
def test_allocate_immutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
Expand All @@ -67,18 +67,18 @@ def test_allocate_immutable(num_cpu_blocks: int, num_gpu_blocks: int,
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

cpu_blocks = [
allocator.allocate_immutable(prev_block=None,
token_ids=token_ids,
device=Device.CPU)
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.CPU)
for token_ids in cpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

gpu_blocks = [
allocator.allocate_immutable(prev_block=None,
token_ids=token_ids,
device=Device.GPU)
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.GPU)
for token_ids in gpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
Expand Down
6 changes: 3 additions & 3 deletions tests/core/block/test_naive_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def create_allocate_lambda(allocate_type: str,
prev_block: Optional[Block],
token_ids: List[int]):
if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable(
allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids)
elif allocate_type == "mutable":
allocate_block = lambda: allocator.allocate_mutable(prev_block=
prev_block)
allocate_block = lambda: allocator.allocate_mutable_block(
prev_block=prev_block)
else:
raise ValueError()

Expand Down
106 changes: 67 additions & 39 deletions tests/core/block/test_prefix_caching_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ def test_first_block_has_correct_content_hash(seed: int, block_size: int,
token_ids = list(range(num_to_fill))
mock_allocator = MagicMock(spec=PrefixCachingBlockAllocator)

block_with_prev = PrefixCachingBlock(
prev_block=None,
token_ids=token_ids,
block_size=block_size,
prefix_caching_allocator=mock_allocator)
block_with_prev = PrefixCachingBlock(prev_block=None,
token_ids=token_ids,
block_size=block_size,
allocator=mock_allocator)

if is_curr_block_full:
# Expect hash since block is full.
Expand Down Expand Up @@ -71,7 +70,7 @@ def test_nth_block_has_correct_content_hash(seed: int, block_size: int,
prev_block=previous_block,
token_ids=token_ids,
block_size=block_size,
prefix_caching_allocator=mock_allocator,
allocator=mock_allocator,
)

if is_curr_block_full and prev_block_has_hash:
Expand Down Expand Up @@ -138,7 +137,7 @@ def create_chain(block_size: int,
prev_block=prev_block,
token_ids=[],
block_size=block_size,
prefix_caching_allocator=allocator,
allocator=allocator,
)

tokens_to_append = token_ids[block_number *
Expand All @@ -159,11 +158,11 @@ def create_allocate_lambda(allocate_type: str, allocator: BlockAllocator,
prev_block: Optional[Block],
token_ids: List[int]):
if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable(
allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids)
elif allocate_type == "mutable":
allocate_block = lambda: allocator.allocate_mutable(prev_block=
prev_block)
allocate_block = lambda: allocator.allocate_mutable_block(
prev_block=prev_block)
else:
raise ValueError()

Expand Down Expand Up @@ -233,12 +232,13 @@ def test_allocate_immutable_ooms_many_hash(num_blocks: int,

# Expect allocation with unseen hash to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_immutable(prev_block=chain[-1],
token_ids=list(range(block_size)))
allocator.allocate_immutable_block(prev_block=chain[-1],
token_ids=list(
range(block_size)))

# Expect mutable allocation to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=chain[-1])
allocator.allocate_mutable_block(prev_block=chain[-1])

# Expect allocation of exact same chain to pass.
second_chain = TestPrefixCachingBlockAllocator.create_immutable_chain(
Expand Down Expand Up @@ -270,7 +270,7 @@ def test_free_prevents_oom(num_blocks: int, block_size: int):

# Expect mutable allocation to fail.
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=None)
allocator.allocate_mutable_block(prev_block=None)

block_to_free = chain[-1]

Expand All @@ -280,11 +280,11 @@ def test_free_prevents_oom(num_blocks: int, block_size: int):
allocator.free(block_to_free)
assert block_to_free.block_id is None, i

new_block = allocator.allocate_mutable(prev_block=None)
new_block = allocator.allocate_mutable_block(prev_block=None)
assert new_block.block_id == block_id, i

with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocator.allocate_mutable(prev_block=None)
allocator.allocate_mutable_block(prev_block=None)

block_to_free = new_block

Expand Down Expand Up @@ -376,17 +376,13 @@ def test_get_common_computed_block_ids(num_blocks: int, block_size: int,

# Create token ids that will exhaust all blocks.
token_ids = list(range(num_blocks_to_consume * block_size))
blocks = list(range(num_blocks_to_consume))

first_chain = TestPrefixCachingBlockAllocator.create_immutable_chain(
block_size=block_size,
token_ids=token_ids,
allocator=allocator,
)

# mark all blocks in first chain as computed
allocator.mark_blocks_as_computed(blocks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(cade) see why this api is no longer required


# After zero_point, second_chain's token_ids would be set -1, which
# make it different from here comparing with first_chain
zero_point = random.randint(1, len(token_ids) - 1)
Expand Down Expand Up @@ -424,15 +420,16 @@ def test_alloc_promotion(num_blocks: int, block_size: int, seed: int):
block_size=block_size)
token_ids = list(range(block_size))

block = allocator.allocate_immutable(prev_block=None,
token_ids=token_ids)
block = allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids)

assert allocator._refcounter.get(block.block_id) == 1
m = allocator.allocate_mutable(prev_block=None)
m = allocator.allocate_mutable_block(prev_block=None)

block_id = m.block_id
for i in range(block_size):
m.append_token_ids([i])

# After block get promoted to immutable from mutable, if there is
# already same content hash block, then it shall be released into
# hashless_allocator
Expand All @@ -452,48 +449,79 @@ def test_eviction_alloc_mixed(num_blocks: int, block_size: int, seed: int):

all_blocks_list = [i for i in range(num_blocks)]
zero_ref = {i: 0 for i in range(num_blocks)}
one_ref = {i: 1 for i in range(num_blocks)}
allocator = PrefixCachingBlockAllocator(num_blocks=num_blocks,
block_size=block_size)
token_ids = list(range(num_blocks * block_size))

# now we have num_blocks free blocks in hashless allocator
# with internal tracking list _blocks _cached_blocks and evictor
# empty and block's ref shall be 0
# Verify initial/pre-alloc state

# Ensure all blocks are free inside hashless allocator
assert list(allocator._hashless_allocator._free_block_indices
) == all_blocks_list
assert len(allocator._blocks.keys()) == 0
# Ensure no tracked blocks
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert not allocator._block_tracker[block_id].active
# Ensure no cached blocks
assert len(allocator._cached_blocks.values()) == 0
# Ensure no evicted blocks
assert len(allocator.evictor.free_table.keys()) == 0
# Ensure 0s ref counts for all blocks
assert allocator._refcounter._refcounts == zero_ref

# Allocate immutable chains with only one block residuled in
new_block = []
for i in range(num_blocks):
block = allocator.allocate_immutable(
block = allocator.allocate_immutable_block(
prev_block=None,
token_ids=token_ids[block_size * i:block_size * (i + 1)])
new_block.append(block)

# Verify post-alloc state

# Ensure no blocks are free inside hashless allocator
assert (len(allocator._hashless_allocator._free_block_indices) == 0)
# Ensure all blocks are tracked
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert allocator._block_tracker[block_id].active
# Ensure all blocks are cached (all promoted)
assert len(allocator._cached_blocks.values()) == num_blocks
# Ensure no evicted blocks
assert len(allocator.evictor.free_table.keys()) == 0
# Ensure 1s ref counts for all blocks
assert allocator._refcounter._refcounts == one_ref

# Free all blocks, and now all blocks shall be in the evictor
# there shall be no tracking data left in _blocks
# there shall be no tracking data left in _block_tracker
# all blocks shall be tracked in _cached_blocks
# all blocks' ref shall be zero
for block in new_block:
allocator.free(block)

assert len(allocator._blocks.keys()) == 0
# Verify post-free state

# Ensure no tracked blocks
assert len(allocator._block_tracker.keys()) == num_blocks
for block_id in range(num_blocks):
assert not allocator._block_tracker[block_id].active
# Ensure no blocks in hashless allocator (all promoted)
assert len(allocator._hashless_allocator._free_block_indices) == 0
# Ensure all blocks are cached
assert list(allocator._cached_blocks.values()) == all_blocks_list
# Ensure all blocks are inside the evictor
assert list(allocator.evictor.free_table.keys()) == all_blocks_list
# Ensure 0s refcounts
assert allocator._refcounter._refcounts == zero_ref

# Allocate a mutable block, and the first block shall be evicted
# and set its content hash into None, ref to 1
mutable = allocator.allocate_mutable(prev_block=None)
mutable = allocator.allocate_mutable_block(prev_block=None)

assert mutable.block_id == 0
assert mutable.content_hash is None
assert 0 in allocator._blocks
assert allocator._block_tracker[0].active
assert allocator._refcounter.get(0) == 1
assert 0 not in allocator._cached_blocks
assert 0 not in allocator.evictor
Expand All @@ -502,27 +530,27 @@ def test_eviction_alloc_mixed(num_blocks: int, block_size: int, seed: int):
# hashless allocator
allocator.free(mutable)

assert len(allocator._blocks.keys()) == 0
assert not allocator._block_tracker[0].active
assert allocator._refcounter._refcounts == zero_ref
assert 0 not in allocator._cached_blocks
assert 0 not in allocator.evictor
assert 0 in allocator._hashless_allocator._free_block_indices

# when allocate immutable with first block_size tokens, we
# When allocate immutable with first block_size tokens, we
# shall get free block from hashless allocator, thus no block left
# in hashless
block = allocator.allocate_immutable(prev_block=None,
token_ids=token_ids[:block_size])
block = allocator.allocate_immutable_block(
prev_block=None, token_ids=token_ids[:block_size])

assert block.block_id == 0
assert len(allocator._hashless_allocator._free_block_indices) == 0
assert 0 in allocator._blocks
assert allocator._block_tracker[0].active
assert 0 in allocator._cached_blocks.values()
assert allocator._refcounter.get(0) == 1
assert 0 not in allocator.evictor

# allocate mutable block again, it shall be popped from evictor
mutable = allocator.allocate_mutable(prev_block=None)
mutable = allocator.allocate_mutable_block(prev_block=None)
assert len(allocator._hashless_allocator._free_block_indices) == 0
assert mutable.block_id not in allocator.evictor.free_table
assert allocator._refcounter.get(mutable.block_id) == 1
Expand Down Expand Up @@ -619,7 +647,7 @@ def create_immutable_chain(
block_token_ids = token_ids[block_number *
block_size:(block_number + 1) *
block_size]
prev_block = allocator.allocate_immutable(
prev_block = allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=block_token_ids)
blocks.append(prev_block)

Expand Down
8 changes: 4 additions & 4 deletions tests/spec_decode/test_batch_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ def test_create_single_target_seq_group_metadata(k: int):

assert output.request_id == input_seq_group_metadata.request_id
assert len(output.seq_data) == 1
assert output.seq_data[target_seq_id].get_prompt_token_ids(
) == prompt_tokens
assert output.seq_data[target_seq_id].get_output_token_ids(
) == prev_output_tokens + token_ids
assert output.seq_data[target_seq_id].get_prompt_token_ids() == tuple(
prompt_tokens)
assert output.seq_data[target_seq_id].get_output_token_ids() == tuple(
prev_output_tokens + token_ids)

assert len(output.block_tables) == 1
assert output.block_tables[
Expand Down
Loading
Loading