Skip to content

Commit

Permalink
Add an option to launch cacheflow without ray (vllm-project#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuohan123 authored Apr 30, 2023
1 parent f75e56d commit e0770fe
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
*.egg-info/
*.eggs/
*.so
*.log
*.csv
build/

*.pkl
*.png
**/log.txt
.vscode/
12 changes: 8 additions & 4 deletions benchmark/benchmark_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from cacheflow.master.simple_frontend import SimpleFrontend
from cacheflow.master.server import (Server, add_server_arguments,
initialize_ray_cluster)
process_server_arguments,
initialize_cluster)
from cacheflow.sampling_params import SamplingParams
from cacheflow.utils import get_gpu_memory, get_cpu_memory

Expand All @@ -20,8 +21,8 @@ def main(args: argparse.Namespace):

(num_nodes, num_devices_per_node, distributed_init_method,
all_stage_devices) = (
initialize_ray_cluster(
address='local',
initialize_cluster(
use_ray=args.use_ray,
pipeline_parallel_size=args.pipeline_parallel_size,
tensor_parallel_size=args.tensor_parallel_size))

Expand All @@ -44,6 +45,7 @@ def main(args: argparse.Namespace):
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=args.use_ray,
)

# Create a frontend.
Expand Down Expand Up @@ -91,14 +93,16 @@ def profile_step(profile=False):


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CacheFlow simple server.')
parser = argparse.ArgumentParser(
description='Benchmark the latency of decoding a single sentence.')
parser = add_server_arguments(parser)
parser.add_argument('--input-len', type=int, default=32)
parser.add_argument('--output-len', type=int, default=128)
parser.add_argument('--batch-size', type=int, default=8)
parser.add_argument('--n', type=int, default=1)
parser.add_argument('--use-beam-search', action='store_true')
args = parser.parse_args()
args = process_server_arguments(args)
args.max_num_batched_tokens = max(
args.max_num_batched_tokens, args.batch_size * args.input_len)
print(args)
Expand Down
16 changes: 10 additions & 6 deletions benchmark/benchmark_text_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from benchmark.trace import generate_text_completion_requests
from cacheflow.master.simple_frontend import SimpleFrontend
from cacheflow.master.server import (Server, add_server_arguments,
initialize_ray_cluster)
process_server_arguments,
initialize_cluster)
from cacheflow.sampling_params import SamplingParams
from cacheflow.utils import get_gpu_memory, get_cpu_memory

Expand All @@ -25,8 +26,8 @@ def main(args: argparse.Namespace):

(num_nodes, num_devices_per_node, distributed_init_method,
all_stage_devices) = (
initialize_ray_cluster(
address='local',
initialize_cluster(
use_ray=args.use_ray,
pipeline_parallel_size=args.pipeline_parallel_size,
tensor_parallel_size=args.tensor_parallel_size))

Expand All @@ -49,6 +50,7 @@ def main(args: argparse.Namespace):
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=args.use_ray,
collect_stats=True,
do_memory_analysis=args.do_memory_analysis,
)
Expand Down Expand Up @@ -134,7 +136,7 @@ def main(args: argparse.Namespace):
finished.append({
'group_id': seq_group.group_id,
'seq_id': seq.seq_id,
'arrival_time': arrival_time,
'arrival_time': arrival_time,
'finish_time': finish_time,
'prompt_len': seq.prompt_len,
'output_len': output_len,
Expand Down Expand Up @@ -225,8 +227,9 @@ def get_sampling_dir_name(


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CacheFlow simple server.')
parser = add_server_arguments(parser)
parser = argparse.ArgumentParser(
description='Benchmark the performance on a series of requests.')
parser = add_server_arguments(parser)
parser.add_argument('--output-dir', type=str, help='path to output directory', default=None)

parser.add_argument('--dataset', type=str, help='path to dataset', required=True)
Expand All @@ -246,6 +249,7 @@ def get_sampling_dir_name(
parser.add_argument('--n6-beam', type=float, help='ratio of requests with n=6 & beam search', default=0.0)
parser.add_argument('--n8-beam', type=float, help='ratio of requests with n=8 & beam search', default=0.0)
args = parser.parse_args()
args = process_server_arguments(args)
if args.n1 + args.n2 + args.n3 + args.n4 + args.n6 + args.n2_beam + args.n4_beam + args.n6_beam + args.n8_beam != 1.0:
raise ValueError('The ratios of requests must sum to 1.')

Expand Down
18 changes: 15 additions & 3 deletions cacheflow/http_frontend/fastapi_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from cacheflow.sampling_params import SamplingParams
from cacheflow.sequence import Sequence, SequenceGroup
from cacheflow.master.server import (Server, add_server_arguments,
initialize_ray_cluster)
process_server_arguments,
initialize_cluster)
from cacheflow.worker.controller import DeviceID
from cacheflow.utils import Counter, get_gpu_memory, get_cpu_memory

Expand All @@ -33,17 +34,22 @@ def __init__(
seed: int,
swap_space: int,
max_num_batched_tokens: int,
max_num_sequences: int,
num_nodes: int,
num_devices_per_node: int,
distributed_init_method: str,
all_stage_devices: List[List[DeviceID]],
server_use_ray: bool,
):
self.block_size = block_size

self.tokenizer = AutoTokenizer.from_pretrained(model)
self.seq_group_counter = Counter()
self.seq_counter = Counter()
remote_server_class = ray.remote(num_cpus=0)(Server)
if server_use_ray:
remote_server_class = ray.remote(num_cpus=0)(Server)
else:
remote_server_class = ray.remote(num_gpus=1)(Server)
self.server = remote_server_class.remote(
model=model,
model_path=model_path,
Expand All @@ -55,12 +61,14 @@ def __init__(
seed=seed,
swap_space=swap_space,
max_num_batched_tokens=max_num_batched_tokens,
max_num_sequences=max_num_sequences,
num_nodes=num_nodes,
num_devices_per_node=num_devices_per_node,
distributed_init_method=distributed_init_method,
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=server_use_ray,
)

self.running_seq_groups: Dict[int, SequenceGroup] = {}
Expand Down Expand Up @@ -149,14 +157,16 @@ async def generate_stream(request: Request):
parser.add_argument("--port", type=int, default=10002)
parser = add_server_arguments(parser)
args = parser.parse_args()
args = process_server_arguments(args)

# TODO(zhuohan): Support pipeline parallelism.
assert args.pipeline_parallel_size == 1, (
'Pipeline parallelism is not supported yet.')

(num_nodes, num_devices_per_node, distributed_init_method,
all_stage_devices) = (
initialize_ray_cluster(
initialize_cluster(
use_ray=True,
pipeline_parallel_size=args.pipeline_parallel_size,
tensor_parallel_size=args.tensor_parallel_size))

Expand All @@ -170,10 +180,12 @@ async def generate_stream(request: Request):
seed=args.seed,
swap_space=args.swap_space,
max_num_batched_tokens=args.max_num_batched_tokens,
max_num_sequences=args.max_num_sequences,
num_nodes=num_nodes,
num_devices_per_node=num_devices_per_node,
distributed_init_method=distributed_init_method,
all_stage_devices=all_stage_devices,
server_use_ray=args.use_ray,
)

uvicorn.run(app, host=args.host, port=args.port, log_level="info")
43 changes: 39 additions & 4 deletions cacheflow/master/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import argparse
from typing import List, Tuple
from typing import List, Tuple, Optional
import random

import ray
import torch
try:
import ray
except ImportError:
ray = None

from cacheflow.master.scheduler import Scheduler
from cacheflow.models import get_memory_analyzer
Expand Down Expand Up @@ -31,13 +35,18 @@ def __init__(
all_stage_devices: List[List[DeviceID]],
gpu_memory: int,
cpu_memory: int,
use_ray: bool,
collect_stats: bool = False,
do_memory_analysis: bool = False,
):
self.num_nodes = num_nodes
self.num_devices_per_node = num_devices_per_node
self.world_size = pipeline_parallel_size * tensor_parallel_size

if not use_ray:
assert self.world_size == 1, (
"Only support single GPU without Ray.")

self.memory_analyzer = get_memory_analyzer(
model_name=model,
block_size=block_size,
Expand Down Expand Up @@ -72,6 +81,7 @@ def __init__(
model_path=model_path,
use_dummy_weights=use_dummy_weights,
max_num_batched_tokens=max_num_batched_tokens,
use_ray=use_ray,
)
self.controllers.append(controller)

Expand Down Expand Up @@ -105,11 +115,30 @@ def has_unfinished_requests(self):
self.scheduler.swapped)


def initialize_ray_cluster(
address: str = 'auto',
def initialize_cluster(
use_ray: bool = False,
address: Optional[str] = None,
pipeline_parallel_size: int = 1,
tensor_parallel_size: int = 1,
) -> Tuple[int, int, str, List[List[DeviceID]]]:
# Initialize cluster locally.
if not use_ray:
assert pipeline_parallel_size * tensor_parallel_size == 1, (
"Only support single GPU without Ray.")
num_nodes = 1
num_devices_per_node = torch.cuda.device_count()
port = random.randint(10000, 20000)
# We need to setup the distributed init method to make sure
# the distributed megatron code (e.g., get world size) works correctly.
distributed_init_method = f"tcp://localhost:{port}"
all_stage_devices = [[(0, None, 0)]]
return (num_nodes, num_devices_per_node, distributed_init_method,
all_stage_devices)

assert ray is not None, (
"Ray is not installed. Please install Ray to use distributed "
"serving.")

# Connect to a ray cluster.
ray.init(address=address)

Expand Down Expand Up @@ -177,6 +206,7 @@ def add_server_arguments(parser: argparse.ArgumentParser):
parser.add_argument('--model-path', type=str, default='~/.cacheflow/model_weights',
help='model path to download and load the weights')
# Parallel arguments
parser.add_argument('--use-ray', action='store_true', help='use Ray for distributed serving, will be automatically set when using more than 1 GPU')
parser.add_argument('--pipeline-parallel-size', '-pp', type=int, default=1, help='number of pipeline stages')
parser.add_argument('--tensor-parallel-size', '-tp', type=int, default=1, help='number of tensor parallel replicas')
# KV cache arguments
Expand All @@ -190,3 +220,8 @@ def add_server_arguments(parser: argparse.ArgumentParser):
parser.add_argument('--max-num-sequences', type=int, default=256, help='maximum number of sequences per iteration')
parser.add_argument('--use-dummy-weights', action='store_true', help='use dummy values for model weights')
return parser

def process_server_arguments(args: argparse.Namespace):
if args.pipeline_parallel_size * args.tensor_parallel_size > 1:
args.use_ray = True
return args
30 changes: 21 additions & 9 deletions cacheflow/worker/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Dict, List, Union, Tuple

import ray
try:
import ray
except ImportError:
ray = None

from cacheflow.master.scheduler import Scheduler
from cacheflow.sequence import SequenceGroupInputs
Expand Down Expand Up @@ -29,24 +32,29 @@ def __init__(
model_path: str,
use_dummy_weights: bool,
max_num_batched_tokens: int,
use_ray: bool,
) -> None:
self.stage_id = stage_id
self.stage_devices = stage_devices
self.model_name = model_name
self.block_size = block_size
self.num_gpu_blocks = num_gpu_blocks
self.num_cpu_blocks = num_cpu_blocks
self.use_ray = use_ray

# Which pipeline stage is this node assigned to?
self.is_first_stage = stage_id == 0
self.is_last_stage = False

self.workers: List[Worker] = []
for rank, node_resource, device_id in stage_devices:
worker_cls = ray.remote(num_cpus=0,
num_gpus=1,
resources={node_resource: 1e-5})(Worker)
worker = worker_cls.remote(
if self.use_ray:
worker_cls = ray.remote(num_cpus=0,
num_gpus=1,
resources={node_resource: 1e-5})(Worker).remote
else:
worker_cls = Worker
worker = worker_cls(
model_name=model_name,
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
Expand Down Expand Up @@ -78,17 +86,21 @@ def execute_stage(
blocks_to_swap_out: Dict[int, int],
blocks_to_copy: Dict[int, List[int]],
) -> None:
futures = []
all_outputs = []
for worker in self.workers:
future = worker.execute_stage.remote(
executor = (worker.execute_stage.remote
if self.use_ray else worker.execute_stage)
output = executor(
input_seq_groups,
blocks_to_swap_in,
blocks_to_swap_out,
blocks_to_copy,
)
futures.append(future)
all_outputs.append(output)

if self.use_ray:
all_outputs = ray.get(all_outputs)

all_outputs = ray.get(futures)
# Make sure all workers have the same results.
output = all_outputs[0]
for other_output in all_outputs[1:]:
Expand Down
8 changes: 6 additions & 2 deletions simple_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from cacheflow.master.simple_frontend import SimpleFrontend
from cacheflow.master.server import (Server, add_server_arguments,
initialize_ray_cluster)
process_server_arguments,
initialize_cluster)
from cacheflow.sampling_params import SamplingParams
from cacheflow.utils import get_gpu_memory, get_cpu_memory

Expand All @@ -14,7 +15,8 @@ def main(args: argparse.Namespace):

(num_nodes, num_devices_per_node, distributed_init_method,
all_stage_devices) = (
initialize_ray_cluster(
initialize_cluster(
use_ray=args.use_ray,
pipeline_parallel_size=args.pipeline_parallel_size,
tensor_parallel_size=args.tensor_parallel_size))

Expand All @@ -37,6 +39,7 @@ def main(args: argparse.Namespace):
all_stage_devices=all_stage_devices,
gpu_memory=get_gpu_memory(),
cpu_memory=get_cpu_memory(),
use_ray=args.use_ray,
)

# Create a frontend.
Expand Down Expand Up @@ -70,4 +73,5 @@ def main(args: argparse.Namespace):
parser = argparse.ArgumentParser(description='CacheFlow simple server.')
parser = add_server_arguments(parser)
args = parser.parse_args()
args = process_server_arguments(args)
main(args)

0 comments on commit e0770fe

Please sign in to comment.