Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Group DeploymentHandle autoscaling metrics pushes by process #45957

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
68 changes: 21 additions & 47 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class HandleMetricReport:
"""Report from a deployment handle on queued and ongoing requests.

Args:
deployment_id: The identifier for the deployment that the
handle targets.
handle_id: The unique identifier for the deployment handle.
actor_id: If the deployment handle (from which this metric was
sent) lives on an actor, the actor ID of that actor.
handle_source: Describes what kind of entity holds this
Expand All @@ -39,6 +42,8 @@ class HandleMetricReport:
timestamp: The time at which this report was received.
"""

deployment_id: DeploymentID
JoshKarpel marked this conversation as resolved.
Show resolved Hide resolved
handle_id: str
actor_id: Optional[str]
handle_source: DeploymentHandleSource
queued_requests: float
Expand Down Expand Up @@ -88,10 +93,10 @@ def __init__(self, deployment_id: DeploymentID):
# Map from handle ID to handle request metric report. Metrics
# are removed from this dict either when the actor on which the
# handle lived dies, or after a period of no updates.
self._handle_requests: Dict[str, HandleMetricReport] = dict()
self._handle_requests: Dict[str, HandleMetricReport] = {}
# Map from replica ID to replica request metric report. Metrics
# are removed from this dict when a replica is stopped.
self._replica_requests: Dict[ReplicaID, ReplicaMetricReport] = dict()
self._replica_requests: Dict[ReplicaID, ReplicaMetricReport] = {}

self._deployment_info = None
self._config = None
Expand Down Expand Up @@ -183,40 +188,22 @@ def record_request_metrics_for_replica(
if window_avg is None:
return

if (
replica_id not in self._replica_requests
or send_timestamp > self._replica_requests[replica_id].timestamp
):
previous_report = self._replica_requests.get(replica_id)

if previous_report is None or send_timestamp > previous_report.timestamp:
self._replica_requests[replica_id] = ReplicaMetricReport(
running_requests=window_avg,
timestamp=send_timestamp,
)

def record_request_metrics_for_handle(
self,
*,
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[ReplicaID, float],
send_timestamp: float,
) -> None:
def record_request_metrics_for_handle(self, report: HandleMetricReport) -> None:
"""Records average number of queued and running requests at a handle for this
deployment.
"""
previous_report = self._handle_requests.get(report.handle_id)

if (
handle_id not in self._handle_requests
or send_timestamp > self._handle_requests[handle_id].timestamp
):
self._handle_requests[handle_id] = HandleMetricReport(
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
timestamp=send_timestamp,
)
if previous_report is None or report.timestamp > previous_report.timestamp:
self._handle_requests[report.handle_id] = report

def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None:
"""Drops handle metrics that are no longer valid.
Expand Down Expand Up @@ -394,28 +381,15 @@ def record_request_metrics_for_replica(
send_timestamp=send_timestamp,
)

def record_request_metrics_for_handle(
self,
*,
deployment_id: str,
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[ReplicaID, float],
send_timestamp: float,
) -> None:
def record_request_metrics_for_handle(self, report: HandleMetricReport) -> None:
"""Update request metric for a specific handle."""

if deployment_id in self._autoscaling_states:
self._autoscaling_states[deployment_id].record_request_metrics_for_handle(
handle_id=handle_id,
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
send_timestamp=send_timestamp,
)
try:
autoscaling_state = self._autoscaling_states[report.deployment_id]
except KeyError:
return

autoscaling_state.record_request_metrics_for_handle(report)

def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None:
"""Drops handle metrics that are no longer valid.
Expand Down
39 changes: 16 additions & 23 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import pickle
import time
from collections.abc import Sequence
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import ray
Expand All @@ -12,9 +13,11 @@
from ray._raylet import GcsClient
from ray.actor import ActorHandle
from ray.serve._private.application_state import ApplicationStateManager, StatusOverview
from ray.serve._private.autoscaling_state import AutoscalingStateManager
from ray.serve._private.autoscaling_state import (
AutoscalingStateManager,
HandleMetricReport,
)
from ray.serve._private.common import (
DeploymentHandleSource,
DeploymentID,
MultiplexedReplicaInfo,
NodeId,
Expand Down Expand Up @@ -260,29 +263,19 @@ def record_autoscaling_metrics(
replica_id, window_avg, send_timestamp
)

def record_handle_metrics(
self,
deployment_id: str,
handle_id: str,
actor_id: Optional[str],
handle_source: DeploymentHandleSource,
queued_requests: float,
running_requests: Dict[str, float],
send_timestamp: float,
):
def record_handle_metrics(self, report: HandleMetricReport) -> None:
logger.debug(
f"Received metrics from handle {handle_id} for deployment {deployment_id}: "
f"{queued_requests} queued requests and {running_requests} running requests"
)
self.autoscaling_state_manager.record_request_metrics_for_handle(
deployment_id=deployment_id,
handle_id=handle_id,
actor_id=actor_id,
handle_source=handle_source,
queued_requests=queued_requests,
running_requests=running_requests,
send_timestamp=send_timestamp,
f"Received metrics from handle {report.handle_id} "
f"for deployment {report.deployment_id}: "
f"{report.queued_requests} queued requests "
f"and {report.running_requests} running requests"
)
self.autoscaling_state_manager.record_request_metrics_for_handle(report)

def bulk_record_handle_metrics(self, reports: Sequence[HandleMetricReport]) -> None:
logger.debug(f"Received {len(reports)} bulk handle metrics reports")
for report in reports:
self.record_handle_metrics(report)

def _dump_autoscaling_metrics_for_testing(self):
return self.autoscaling_state_manager.get_metrics()
Expand Down
5 changes: 3 additions & 2 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,13 @@ def record_request_metrics(

def _push_autoscaling_metrics(self) -> Dict[str, Any]:
look_back_period = self._autoscaling_config.look_back_period_s
now = time.time()
self._controller_handle.record_autoscaling_metrics.remote(
replica_id=self._replica_id,
window_avg=self._metrics_store.window_average(
self._replica_id, time.time() - look_back_period
self._replica_id, now - look_back_period
),
send_timestamp=time.time(),
send_timestamp=now,
)

def _add_autoscaling_metrics_point(self) -> None:
Expand Down
79 changes: 63 additions & 16 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import logging
import threading
import time
import weakref
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import contextmanager
from functools import partial
from functools import lru_cache, partial
from typing import Any, Coroutine, DefaultDict, Dict, List, Optional, Tuple, Union

import ray
from ray.actor import ActorHandle
from ray.exceptions import ActorDiedError, ActorUnavailableError, RayError
from ray.serve._private.autoscaling_state import HandleMetricReport
from ray.serve._private.common import (
DeploymentHandleSource,
DeploymentID,
Expand Down Expand Up @@ -201,10 +203,12 @@ def update_deployment_config(
),
)
# Push metrics to the controller periodically.
self.metrics_pusher.register_or_update_task(
self.PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
self.push_autoscaling_metrics_to_controller,
autoscaling_config.metrics_interval_s,
Copy link
Contributor Author

@JoshKarpel JoshKarpel Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zcin @edoakes I'm working on resurrecting this PR and I think this was the main sticking point - the shared metrics pusher wouldn't respect the autoscaling_config.metrics_interval_s in the current form of the PR. Can we deprecate that parameter? Would we need to feature-flag this PR so it can be introduced gradually? Or maybe we want to preserve it and have a different shared pusher for each metric_interval_s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with making the metrics_interval_s a cluster-level option configured via env var. WDYT Cindy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I like that option!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think making the metrics interval a cluster level option makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - I will take that approach when I get back from the holidays

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it turns out there already is an env var for this, HANDLE_METRIC_PUSH_INTERVAL_S

# Handle metric push interval. (This interval will affect the cold start time period)
HANDLE_METRIC_PUSH_INTERVAL_S = float(
os.environ.get("RAY_SERVE_HANDLE_METRIC_PUSH_INTERVAL_S", "10")
)
, and I'm already using it
HANDLE_METRIC_PUSH_INTERVAL_S,

The autoscaling_config.metrics_interval_s is also used in a variety of other places, like for controlling how often metrics are recorded

min(
RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_PERIOD_S,
autoscaling_config.metrics_interval_s,
),

Should I remove all of those and just use the single env var, or keep the other uses and only "ignore" the setting here for the shared pusher?

I'm inclined to remove all of them for consistency but that's a much bigger blast radius...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove autoscaling_config.metrics_interval_s and just use RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_PERIOD_S for recording metrics, because the interval at which we record metrics is usually more frequent than pushing metrics to the controller. And then we can use HANDLE_METRIC_PUSH_INTERVAL_S, but perhaps renaming it to include "autoscaling" would be better going forward, if it is to replace autoscaling_config.metrics_interval_s?

shared = SharedHandleMetricsPusher.get_or_create(
self._controller_handle
)
shared.register(self)
logger.info(
f"Registered {self._handle_id} with shared metrics pusher {shared}."
)
else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this else block - it seems like even if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = False we still register the pushing task? It just won't have any data because the _add_autoscaling_metrics_point task isn't registered? 🤔

self.metrics_pusher.register_or_update_task(
Expand Down Expand Up @@ -249,21 +253,24 @@ def should_send_scaled_to_zero_optimized_push(self, curr_num_replicas: int) -> b
and self.num_queued_requests > 0
)

def push_autoscaling_metrics_to_controller(self):
"""Pushes queued and running request metrics to the controller.

These metrics are used by the controller for autoscaling.
"""

self._controller_handle.record_handle_metrics.remote(
send_timestamp=time.time(),
def metrics_report(self) -> HandleMetricReport:
return HandleMetricReport(
timestamp=time.time(),
deployment_id=self._deployment_id,
handle_id=self._handle_id,
actor_id=self._self_actor_id,
handle_source=self._handle_source,
**self._get_aggregated_requests(),
)

def push_autoscaling_metrics_to_controller(self):
"""Pushes queued and running request metrics to the controller.

These metrics are used by the controller for autoscaling.
"""

self._controller_handle.record_handle_metrics.remote(self.metrics_report())

def _add_autoscaling_metrics_point(self):
"""Adds metrics point for queued and running requests at replicas.

Expand All @@ -280,16 +287,17 @@ def _add_autoscaling_metrics_point(self):
)

# Prevent in memory metrics store memory from growing
start_timestamp = time.time() - self.autoscaling_config.look_back_period_s
start_timestamp = timestamp - self.autoscaling_config.look_back_period_s
self.metrics_store.prune_keys_and_compact_data(start_timestamp)

def _get_aggregated_requests(self):
running_requests = dict()
running_requests = {}
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE and self.autoscaling_config:
look_back_period = self.autoscaling_config.look_back_period_s
window_start_time = time.time() - look_back_period
running_requests = {
replica_id: self.metrics_store.window_average(
replica_id, time.time() - look_back_period
replica_id, window_start_time
)
# If data hasn't been recorded yet, return current
# number of queued and ongoing requests.
Expand All @@ -311,6 +319,45 @@ async def shutdown(self):
self._shutdown = True


class SharedHandleMetricsPusher:
def __init__(self, controller_handle: ActorHandle):
self._controller_handler = controller_handle

self._metrics_pusher = MetricsPusher()
self._router_metrics_managers: weakref.WeakSet[
RouterMetricsManager
] = weakref.WeakSet()

@classmethod
@lru_cache(maxsize=None)
def get_or_create(
cls, controller_handle: ActorHandle
) -> "SharedHandleMetricsPusher":
pusher = cls(controller_handle=controller_handle)
pusher.start()
logger.info(f"Started {pusher}.")
return pusher

def register(self, router_metrics_manager: RouterMetricsManager) -> None:
self._router_metrics_managers.add(router_metrics_manager)

def start(self) -> None:
self._metrics_pusher.start()

self._metrics_pusher.register_or_update_task(
"push_metrics_to_controller",
self.push_metrics,
HANDLE_METRIC_PUSH_INTERVAL_S,
)

def push_metrics(self) -> None:
# TODO: gathering reports could block the event loop for a long time
logger.debug("Pushing handle metrics to controller")
self._controller_handler.bulk_record_handle_metrics.remote(
[m.metrics_report() for m in self._router_metrics_managers]
)


class Router(ABC):
@abstractmethod
def running_replicas_populated(self) -> bool:
Expand Down
17 changes: 10 additions & 7 deletions python/ray/serve/tests/unit/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray._private.test_utils import async_wait_for_condition
from ray._private.utils import get_or_create_event_loop
from ray.exceptions import ActorDiedError, ActorUnavailableError
from ray.serve._private.autoscaling_state import HandleMetricReport
from ray.serve._private.common import (
DeploymentHandleSource,
DeploymentID,
Expand Down Expand Up @@ -907,13 +908,15 @@ def test_push_autoscaling_metrics_to_controller(self):
# Check metrics are pushed correctly
metrics_manager.push_autoscaling_metrics_to_controller()
mock_controller_handle.record_handle_metrics.remote.assert_called_with(
deployment_id=deployment_id,
handle_id=handle_id,
actor_id=self_actor_id,
handle_source=DeploymentHandleSource.PROXY,
queued_requests=n,
running_requests=running_requests,
send_timestamp=start,
HandleMetricReport(
deployment_id=deployment_id,
handle_id=handle_id,
actor_id=self_actor_id,
handle_source=DeploymentHandleSource.PROXY,
queued_requests=n,
running_requests=running_requests,
timestamp=start,
)
)

@pytest.mark.skipif(
Expand Down
Loading