From 68f765e474cac48dbf6111659d9b9cc8557a9f40 Mon Sep 17 00:00:00 2001 From: Eero Tamminen Date: Mon, 23 Dec 2024 12:24:31 +0200 Subject: [PATCH 1/2] Create token metrics only when they are available This avoids generation of useless token/request histogram metrics for services that use Orchestrator class, but never call its token processing functionality. (Helps in differentiating frontend megaservice metrics from backend megaservice ones, especially when multiple OPEA applications run in the same cluster.) Also change Orchestrator CI test workaround to use unique prefix for each metric instance, instead of metrics being (singleton) class variables. Signed-off-by: Eero Tamminen --- comps/cores/mega/orchestrator.py | 49 ++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 4053988566..95e84ea10f 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -27,20 +27,45 @@ class OrchestratorMetrics: - # Because: + # Need an instance ID for metric prefix because: + # - Orchestror instances are not named # - CI creates several orchestrator instances - # - Prometheus requires metrics to be singletons - # - Oorchestror instances are not provided their own names - # Metrics are class members with "megaservice" name prefix - first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)") - inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)") - request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)") - request_pending = Gauge("megaservice_request_pending", "Count of currently pending requests (gauge)") + # - Prometheus requires metrics (their names) to be unique + _instance_id = 0 def __init__(self) -> None: - pass - - def token_update(self, token_start: float, is_first: bool) -> float: + self._instance_id += 1 + if self._instance_id > 1: + self._prefix = f"megaservice{self._instance_id}" + else: + self._prefix = "megaservice" + + self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)") + + # Metrics related to token processing are created on demand, + # to avoid bogus ones for services that never handle tokens + self.first_token_latency = None + self.inter_token_latency = None + self.request_latency = None + + # initial methods create the metric + self.token_update = self._token_update_create + self.request_update = self._request_update_create + + def _token_update_create(self, token_start: float, is_first: bool) -> float: + self.first_token_latency = Histogram(f"{self._prefix}_first_token_latency", "First token latency (histogram)") + self.inter_token_latency = Histogram(f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)") + self.token_update = self._token_update_real + return self.token_update(token_start, is_first) + + def _request_update_create(self, req_start: float) -> None: + self.request_latency = Histogram( + f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)" + ) + self.request_update = self._request_update_real + self.request_update(req_start) + + def _token_update_real(self, token_start: float, is_first: bool) -> float: now = time.time() if is_first: self.first_token_latency.observe(now - token_start) @@ -48,7 +73,7 @@ def token_update(self, token_start: float, is_first: bool) -> float: self.inter_token_latency.observe(now - token_start) return now - def request_update(self, req_start: float) -> None: + def _request_update_real(self, req_start: float) -> None: self.request_latency.observe(time.time() - req_start) def pending_update(self, increase: bool) -> None: From 39cd93ad396531ddbe43cbfa1d5c855b6c8e1813 Mon Sep 17 00:00:00 2001 From: Eero Tamminen Date: Fri, 10 Jan 2025 16:20:15 +0200 Subject: [PATCH 2/2] Add locking for latency metric creation / method change As that that could be called from multiple request handling threads. Signed-off-by: Eero Tamminen --- comps/cores/mega/orchestrator.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 95e84ea10f..97ee2a76b3 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -7,6 +7,7 @@ import json import os import re +import threading import time from typing import Dict, List @@ -42,27 +43,40 @@ def __init__(self) -> None: self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)") + # locking for latency metric creation / method change + self._lock = threading.Lock() + # Metrics related to token processing are created on demand, # to avoid bogus ones for services that never handle tokens self.first_token_latency = None self.inter_token_latency = None self.request_latency = None - # initial methods create the metric + # initial methods to create the metrics self.token_update = self._token_update_create self.request_update = self._request_update_create def _token_update_create(self, token_start: float, is_first: bool) -> float: - self.first_token_latency = Histogram(f"{self._prefix}_first_token_latency", "First token latency (histogram)") - self.inter_token_latency = Histogram(f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)") - self.token_update = self._token_update_real + with self._lock: + # in case another thread already got here + if self.token_update == self._token_update_create: + self.first_token_latency = Histogram( + f"{self._prefix}_first_token_latency", "First token latency (histogram)" + ) + self.inter_token_latency = Histogram( + f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)" + ) + self.token_update = self._token_update_real return self.token_update(token_start, is_first) def _request_update_create(self, req_start: float) -> None: - self.request_latency = Histogram( - f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)" - ) - self.request_update = self._request_update_real + with self._lock: + # in case another thread already got here + if self.request_update == self._request_update_create: + self.request_latency = Histogram( + f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)" + ) + self.request_update = self._request_update_real self.request_update(req_start) def _token_update_real(self, token_start: float, is_first: bool) -> float: