Skip to content

Commit

Permalink
Create token metrics only when they are available (opea-project#1092)
Browse files Browse the repository at this point in the history
* Create token metrics only when they are available

This avoids generation of useless token/request histogram metrics
for services that use Orchestrator class, but never call its token
processing functionality.

(Helps in differentiating frontend megaservice metrics from backend
megaservice ones, especially when multiple OPEA applications run in
the same cluster.)

Also change Orchestrator CI test workaround to use unique prefix for
each metric instance, instead of metrics being (singleton) class
variables.

Signed-off-by: Eero Tamminen <[email protected]>

* Add locking for latency metric creation / method change

As that that could be called from multiple request handling threads.

Signed-off-by: Eero Tamminen <[email protected]>

---------

Signed-off-by: Eero Tamminen <[email protected]>
Co-authored-by: Malini Bhandaru <[email protected]>
  • Loading branch information
eero-t and mkbhanda authored Feb 5, 2025
1 parent 119acf2 commit 4ede405
Showing 1 changed file with 50 additions and 11 deletions.
61 changes: 50 additions & 11 deletions comps/cores/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import os
import re
import threading
import time
from typing import Dict, List

Expand All @@ -27,28 +28,66 @@


class OrchestratorMetrics:
# Because:
# Need an instance ID for metric prefix because:
# - Orchestror instances are not named
# - CI creates several orchestrator instances
# - Prometheus requires metrics to be singletons
# - Oorchestror instances are not provided their own names
# Metrics are class members with "megaservice" name prefix
first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)")
inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)")
request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)")
request_pending = Gauge("megaservice_request_pending", "Count of currently pending requests (gauge)")
# - Prometheus requires metrics (their names) to be unique
_instance_id = 0

def __init__(self) -> None:
pass
self._instance_id += 1
if self._instance_id > 1:
self._prefix = f"megaservice{self._instance_id}"
else:
self._prefix = "megaservice"

self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)")

# locking for latency metric creation / method change
self._lock = threading.Lock()

# Metrics related to token processing are created on demand,
# to avoid bogus ones for services that never handle tokens
self.first_token_latency = None
self.inter_token_latency = None
self.request_latency = None

# initial methods to create the metrics
self.token_update = self._token_update_create
self.request_update = self._request_update_create

def _token_update_create(self, token_start: float, is_first: bool) -> float:
with self._lock:
# in case another thread already got here
if self.token_update == self._token_update_create:
self.first_token_latency = Histogram(
f"{self._prefix}_first_token_latency", "First token latency (histogram)"
)
self.inter_token_latency = Histogram(
f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)"
)
self.token_update = self._token_update_real
return self.token_update(token_start, is_first)

def _request_update_create(self, req_start: float) -> None:
with self._lock:
# in case another thread already got here
if self.request_update == self._request_update_create:
self.request_latency = Histogram(
f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)"
)
self.request_update = self._request_update_real
self.request_update(req_start)

def token_update(self, token_start: float, is_first: bool) -> float:
def _token_update_real(self, token_start: float, is_first: bool) -> float:
now = time.time()
if is_first:
self.first_token_latency.observe(now - token_start)
else:
self.inter_token_latency.observe(now - token_start)
return now

def request_update(self, req_start: float) -> None:
def _request_update_real(self, req_start: float) -> None:
self.request_latency.observe(time.time() - req_start)

def pending_update(self, increase: bool) -> None:
Expand Down

0 comments on commit 4ede405

Please sign in to comment.