Skip to content

Commit

Permalink
Executor & Scheduler Improvements
Browse files Browse the repository at this point in the history
* Executor is refactored in order to track cancelled tasks on the level
  of the scheduler instead of tracking that on the Task level itself
* Report generation with an executor is tested
* Some redundancy code is removed
  • Loading branch information
Dmytro Parfeniuk committed Jul 17, 2024
1 parent f905346 commit 785aae6
Show file tree
Hide file tree
Showing 15 changed files with 422 additions and 202 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ install.dev:
python -m pip install -e .[dev]



.PHONY: build
build:
python setup.py sdist bdist_wheel
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ lint.select = ["E", "F", "W"]

[tool.pytest.ini_options]
addopts = '-s -vvv --cache-clear'
asyncio_mode = 'auto'
markers = [
"smoke: quick tests to check basic functionality",
"sanity: detailed tests to ensure major functions work correctly",
Expand Down
4 changes: 2 additions & 2 deletions src/guidellm/backend/openai.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
import os
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Generator, List, Optional

from loguru import logger
from openai import OpenAI, Stream
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(

def make_request(
self, request: TextGenerationRequest
) -> Iterator[GenerativeResponse]:
) -> Generator[GenerativeResponse, None, None]:
"""
Make a request to the OpenAI backend.
Expand Down
23 changes: 15 additions & 8 deletions src/guidellm/core/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def end_time(self) -> float:
:rtype: float
"""

self._recording_started()
assert self._end_time
return self._end_time

Expand Down Expand Up @@ -205,7 +206,7 @@ def _recording_started(self, raise_exception: bool = True) -> bool:
else:
if raise_exception is True:
raise ValueError(
"start time is not specified. "
"Start time is not specified. "
"Did you make the `text_generation_benchmark.start()`?"
)
else:
Expand Down Expand Up @@ -270,7 +271,11 @@ class TextGenerationError:
:type error: Exception
"""

def __init__(self, request: TextGenerationRequest, error: Exception):
def __init__(
self,
request: TextGenerationRequest,
error_class: BaseException,
):
"""
Initialize the TextGenerationError with a unique identifier.
Expand All @@ -279,10 +284,10 @@ def __init__(self, request: TextGenerationRequest, error: Exception):
:param error: The exception that occurred during the text generation.
:type error: Exception
"""
self._request = request
self._error = error
self._request: TextGenerationRequest = request
self._error_class: BaseException = error_class

logger.error(f"Error occurred for request: {self._request}: {error}")
logger.error(f"Error occurred for request: {self._request}: {error_class}")

def __repr__(self) -> str:
"""
Expand All @@ -291,7 +296,9 @@ def __repr__(self) -> str:
:return: String representation of the TextGenerationError.
:rtype: str
"""
return f"TextGenerationError(request={self._request}, error={self._error})"
return (
f"TextGenerationError(request={self._request}, error={self._error_class})"
)

@property
def request(self) -> TextGenerationRequest:
Expand All @@ -304,14 +311,14 @@ def request(self) -> TextGenerationRequest:
return self._request

@property
def error(self) -> Exception:
def error(self) -> BaseException:
"""
Get the exception that occurred during the text generation.
:return: The exception.
:rtype: Exception
"""
return self._error
return self._error_class


@dataclass
Expand Down
4 changes: 2 additions & 2 deletions src/guidellm/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from .executor import Executor
from .profile_generator import (
Profile,
ProfileGenerationModes,
ProfileGenerationMode,
ProfileGenerator,
SingleProfileGenerator,
SweepProfileGenerator,
)

__all__ = [
"Executor",
"ProfileGenerationModes",
"ProfileGenerationMode",
"Profile",
"ProfileGenerator",
"SingleProfileGenerator",
Expand Down
47 changes: 34 additions & 13 deletions src/guidellm/executor/executor.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,63 @@
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional

from guidellm.backend import Backend
from guidellm.core import TextGenerationBenchmarkReport
from guidellm.executor.profile_generator import ProfileGenerationModes, ProfileGenerator
from guidellm.request import RequestGenerator
from guidellm.scheduler.scheduler import Scheduler
from guidellm.scheduler import Scheduler

from .profile_generator import ProfileGenerationMode, ProfileGenerator

__all__ = ["Executor"]


class Executor:
"""
The main purpose of the `class Executor` is to dispatch running tasks according
to the Profile Generation mode
"""

def __init__(
self,
request_generator: RequestGenerator,
backend: Backend,
profile_mode: Union[str, ProfileGenerationModes] = "single",
request_generator: RequestGenerator,
profile_mode: ProfileGenerationMode = ProfileGenerationMode.SINGLE,
profile_args: Optional[Dict[str, Any]] = None,
max_requests: Optional[int] = None,
max_duration: Optional[float] = None,
):
self.request_generator = request_generator
self.backend = backend
self.profile = ProfileGenerator.create_generator(
self.profile_generator: ProfileGenerator = ProfileGenerator.create(
profile_mode, **(profile_args or {})
)
self.max_requests = max_requests
self.max_duration = max_duration
self.max_requests: Optional[int] = max_requests
self.max_duration: Optional[float] = max_duration
self._scheduler: Optional[Scheduler] = None

@property
def scheduler(self) -> Scheduler:
if self._scheduler is None:
raise ValueError("The scheduler is not defined. Did you run the execution?")
else:
return self._scheduler

@scheduler.setter
def scheduler(self, value: Any):
if not isinstance(value, Scheduler):
raise TypeError(
"Only Scheduler instances could be set as a self._scheduler"
)
else:
self._scheduler = value

def run(self) -> TextGenerationBenchmarkReport:
report = TextGenerationBenchmarkReport()

while True:
profile = self.profile.next_profile(report)

if profile is None:
if not (profile := self.profile_generator.next(report)):
break

scheduler = Scheduler(
self.scheduler = Scheduler(
request_generator=self.request_generator,
backend=self.backend,
load_gen_mode=profile.load_gen_mode,
Expand All @@ -45,7 +66,7 @@ def run(self) -> TextGenerationBenchmarkReport:
max_duration=self.max_duration,
)

benchmark = scheduler.run()
benchmark = self.scheduler.run()
report.add_benchmark(benchmark)

return report
76 changes: 32 additions & 44 deletions src/guidellm/executor/profile_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,111 +6,99 @@
import numpy

from guidellm.core import TextGenerationBenchmarkReport
from guidellm.scheduler import LoadGenerationModes
from guidellm.scheduler import LoadGenerationMode

__all__ = [
"ProfileGenerationModes",
"ProfileGenerationMode",
"Profile",
"ProfileGenerator",
"SingleProfileGenerator",
"SweepProfileGenerator",
]


class ProfileGenerationModes(Enum):
class ProfileGenerationMode(Enum):
SINGLE = "single"
SWEEP = "sweep"


@dataclass()
@dataclass
class Profile:
load_gen_mode: LoadGenerationModes
load_gen_mode: LoadGenerationMode
load_gen_rate: Optional[float]


class ProfileGenerator(ABC):
_registry: Dict[ProfileGenerationModes, "Type[ProfileGenerator]"] = {}
_registry: Dict[ProfileGenerationMode, "Type[ProfileGenerator]"] = {}

@staticmethod
def register_generator(mode: ProfileGenerationModes):
def register(mode: ProfileGenerationMode):
def inner_wrapper(wrapped_class):
ProfileGenerator._registry[mode] = wrapped_class
return wrapped_class

return inner_wrapper

@staticmethod
def create_generator(
mode: Union[str, ProfileGenerationModes], **kwargs
) -> "ProfileGenerator":
if isinstance(mode, str):
mode = ProfileGenerationModes(mode)

def create(mode: ProfileGenerationMode, **kwargs) -> "ProfileGenerator":
if mode not in ProfileGenerator._registry:
raise ValueError(f"Invalid profile generation mode: {mode}")

return ProfileGenerator._registry[mode](**kwargs)

def __init__(self, mode: Union[str, ProfileGenerationModes]):
self._mode = ProfileGenerationModes(mode)
def __init__(self, mode: Union[str, ProfileGenerationMode]):
self._mode = ProfileGenerationMode(mode)

@abstractmethod
def next_profile(
self, current_report: TextGenerationBenchmarkReport
) -> Optional[Profile]:
def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profile]:
""" """
pass


@ProfileGenerator.register_generator(ProfileGenerationModes.SINGLE)
@ProfileGenerator.register(ProfileGenerationMode.SINGLE)
class SingleProfileGenerator(ProfileGenerator):
def __init__(self, rate: float, rate_type: str, **kwargs):
super().__init__(ProfileGenerationModes.SINGLE)
self._rate = rate
self._rate_type = rate_type
self._generated = False

def next_profile(
self, current_report: TextGenerationBenchmarkReport
) -> Optional[Profile]:
def __init__(self, rate: float, rate_type: LoadGenerationMode):
super().__init__(ProfileGenerationMode.SINGLE)
self._rate: float = rate
self._rate_type: LoadGenerationMode = rate_type
self._generated: bool = False

def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profile]:
if self._generated:
return None

self._generated = True

if self._rate_type == "constant":
if self._rate_type == LoadGenerationMode.CONSTANT:
return Profile(
load_gen_mode=LoadGenerationModes.CONSTANT, load_gen_rate=self._rate
load_gen_mode=LoadGenerationMode.CONSTANT, load_gen_rate=self._rate
)

if self._rate_type == "synchronous":
elif self._rate_type == LoadGenerationMode.SYNCHRONOUS:
return Profile(
load_gen_mode=LoadGenerationModes.SYNCHRONOUS, load_gen_rate=None
load_gen_mode=LoadGenerationMode.SYNCHRONOUS, load_gen_rate=None
)

if self._rate_type == "poisson":
elif self._rate_type == LoadGenerationMode.POISSON:
return Profile(
load_gen_mode=LoadGenerationModes.POISSON, load_gen_rate=self._rate
load_gen_mode=LoadGenerationMode.POISSON, load_gen_rate=self._rate
)

raise ValueError(f"Invalid rate type: {self._rate_type}")


@ProfileGenerator.register_generator(ProfileGenerationModes.SWEEP)
@ProfileGenerator.register(ProfileGenerationMode.SWEEP)
class SweepProfileGenerator(ProfileGenerator):
def __init__(self, **kwargs):
super().__init__(ProfileGenerationModes.SWEEP)
super().__init__(ProfileGenerationMode.SWEEP)
self._sync_run = False
self._max_found = False
self._pending_rates = None

def next_profile(
self, current_report: TextGenerationBenchmarkReport
) -> Optional[Profile]:
def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profile]:
if not self._sync_run:
self._sync_run = True

return Profile(
load_gen_mode=LoadGenerationModes.SYNCHRONOUS, load_gen_rate=None
load_gen_mode=LoadGenerationMode.SYNCHRONOUS, load_gen_rate=None
)

if not self._max_found:
Expand All @@ -125,7 +113,7 @@ def next_profile(
else last_benchmark.request_rate
)
return Profile(
load_gen_mode=LoadGenerationModes.CONSTANT,
load_gen_mode=LoadGenerationMode.CONSTANT,
load_gen_rate=last_rate * 2,
)
else:
Expand All @@ -148,7 +136,7 @@ def next_profile(
if self._pending_rates:
rate = self._pending_rates.pop(0)
return Profile(
load_gen_mode=LoadGenerationModes.CONSTANT, load_gen_rate=rate
load_gen_mode=LoadGenerationMode.CONSTANT, load_gen_rate=rate
)

return None
4 changes: 2 additions & 2 deletions src/guidellm/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .load_generator import LoadGenerationModes, LoadGenerator
from .load_generator import LoadGenerationMode, LoadGenerator
from .scheduler import Scheduler
from .task import Task

__all__ = ["LoadGenerationModes", "LoadGenerator", "Scheduler", "Task"]
__all__ = ["LoadGenerationMode", "LoadGenerator", "Scheduler", "Task"]
Loading

0 comments on commit 785aae6

Please sign in to comment.