From 20b9a2d86210e598087b0230b1c66b969d712280 Mon Sep 17 00:00:00 2001 From: Jisang Ahn <113268235+michahn01@users.noreply.github.com> Date: Sun, 2 Feb 2025 19:45:12 -0500 Subject: [PATCH] Zeus daemon integration for `ZeusMonitor` CPU and DRAM (#150) --- zeus/device/cpu/rapl.py | 99 +++++++++++++++++++++++++++++++++++- zeusd/src/devices/cpu/mod.rs | 38 +++++++++++--- zeusd/src/routes/cpu.rs | 25 ++++++++- zeusd/tests/cpu.rs | 19 +++++++ zeusd/tests/helpers/mod.rs | 2 +- 5 files changed, 171 insertions(+), 12 deletions(-) diff --git a/zeus/device/cpu/rapl.py b/zeus/device/cpu/rapl.py index 20e0e552..ff4a8577 100644 --- a/zeus/device/cpu/rapl.py +++ b/zeus/device/cpu/rapl.py @@ -18,14 +18,17 @@ import os import time import warnings +from pathlib import Path from functools import lru_cache from glob import glob from multiprocessing.sharedctypes import Synchronized from typing import Sequence +import httpx + import zeus.device.cpu.common as cpu_common from zeus.device.cpu.common import CpuDramMeasurement -from zeus.device.exception import ZeusBaseCPUError +from zeus.device.exception import ZeusBaseCPUError, ZeusdError from zeus.utils.logging import get_logger logger = get_logger(name=__name__) @@ -261,6 +264,78 @@ def supportsGetDramEnergyConsumption(self) -> bool: return self.dram is not None +class ZeusdRAPLCPU(RAPLCPU): + """A RAPLCPU that interfaces with RAPL via zeusd. + + The parent RAPLCPU class requires root privileges to interface with RAPL. + ZeusdRAPLCPU (this class) overrides RAPLCPU's methods so that they instead send requests + to the Zeus daemon, which will interface with RAPL on behalf of ZeusdRAPLCPU. As a result, + ZeusdRAPLCPU does not need root privileges to monitor CPU and DRAM energy consumption. + + See [here](https://ml.energy/zeus/getting_started/#system-privileges) + for details on system privileges required. + """ + + def __init__( + self, + cpu_index: int, + zeusd_sock_path: str = "/var/run/zeusd.sock", + ) -> None: + """Initialize the Intel CPU with a specified index.""" + self.cpu_index = cpu_index + + self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path)) + self._url_prefix = f"http://zeusd/cpu/{cpu_index}" + + self.dram_available = self._supportsGetDramEnergyConsumption() + + def _supportsGetDramEnergyConsumption(self) -> bool: + """Calls zeusd to return if the specified CPU supports DRAM energy monitoring.""" + resp = self._client.get( + self._url_prefix + "/supports_dram_energy", + ) + if resp.status_code != 200: + raise ZeusdError( + f"Failed to get whether DRAM energy is supported: {resp.text}" + ) + data = resp.json() + dram_available = data.get("dram_available") + if dram_available is None: + raise ZeusdError("Failed to get whether DRAM energy is supported.") + return dram_available + + def getTotalEnergyConsumption(self) -> CpuDramMeasurement: + """Returns the total energy consumption of the specified powerzone. Units: mJ.""" + resp = self._client.post( + self._url_prefix + "/get_index_energy", + json={ + "cpu": True, + "dram": True, + }, + ) + if resp.status_code != 200: + raise ZeusdError(f"Failed to get total energy consumption: {resp.text}") + + data = resp.json() + cpu_mj = data["cpu_energy_uj"] / 1000 + + dram_mj = None + dram_uj = data.get("dram_energy_uj") + if dram_uj is None: + if self.dram_available: + raise ZeusdError( + "DRAM energy should be available but no measurement was found" + ) + else: + dram_mj = dram_uj / 1000 + + return CpuDramMeasurement(cpu_mj=cpu_mj, dram_mj=dram_mj) + + def supportsGetDramEnergyConsumption(self) -> bool: + """Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption.""" + return self.dram_available + + class RAPLCPUs(cpu_common.CPUs): """RAPL CPU Manager object, containing individual RAPLCPU objects, abstracting RAPL calls and handling related exceptions.""" @@ -281,13 +356,33 @@ def _init_cpus(self) -> None: """Initialize all Intel CPUs.""" self._cpus = [] + cpu_indices = [] + def sort_key(dir): return int(dir.split(":")[1]) for dir in sorted(glob(f"{self.rapl_dir}/intel-rapl:*"), key=sort_key): parts = dir.split(":") if len(parts) > 1 and parts[1].isdigit(): - self._cpus.append(RAPLCPU(int(parts[1]), self.rapl_dir)) + cpu_indices.append(int(parts[1])) + + # If `ZEUSD_SOCK_PATH` is set, always use ZeusdRAPLCPU + if (sock_path := os.environ.get("ZEUSD_SOCK_PATH")) is not None: + if not Path(sock_path).exists(): + raise ZeusdError( + f"ZEUSD_SOCK_PATH points to non-existent file: {sock_path}" + ) + if not Path(sock_path).is_socket(): + raise ZeusdError(f"ZEUSD_SOCK_PATH is not a socket: {sock_path}") + if not os.access(sock_path, os.W_OK): + raise ZeusdError(f"ZEUSD_SOCK_PATH is not writable: {sock_path}") + self._cpus = [ + ZeusdRAPLCPU(cpu_index, sock_path) for cpu_index in cpu_indices + ] + else: + self._cpus = [ + RAPLCPU(cpu_index, self.rapl_dir) for cpu_index in cpu_indices + ] def __del__(self) -> None: """Shuts down the Intel CPU monitoring.""" diff --git a/zeusd/src/devices/cpu/mod.rs b/zeusd/src/devices/cpu/mod.rs index f79ee1ee..fbfc6c44 100644 --- a/zeusd/src/devices/cpu/mod.rs +++ b/zeusd/src/devices/cpu/mod.rs @@ -35,6 +35,19 @@ pub struct RaplResponse { pub dram_energy_uj: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct DramAvailabilityResponse { + pub dram_available: bool, +} + +/// Unified CPU response type +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum CpuResponse { + Rapl(RaplResponse), + Dram(DramAvailabilityResponse), +} + pub trait CpuManager { /// Get the number of CPUs available. fn device_count() -> Result; @@ -55,7 +68,7 @@ pub trait CpuManager { pub type CpuCommandRequest = ( CpuCommand, - Option>>, + Option>>, Instant, Span, ); @@ -89,7 +102,7 @@ impl CpuManagementTasks { cpu_id: usize, command: CpuCommand, request_start_time: Instant, - ) -> Result { + ) -> Result { if cpu_id >= self.senders.len() { return Err(ZeusdError::CpuNotFoundError(cpu_id)); } @@ -128,6 +141,8 @@ impl CpuManagementTasks { pub enum CpuCommand { /// Get the CPU and DRAM energy measurement for the CPU index GetIndexEnergy { cpu: bool, dram: bool }, + /// Return if the specified CPU supports DRAM energy measurement + SupportsDramEnergy, /// Stop the monitoring task for CPU and DRAM if they have been started. StopMonitoring, } @@ -156,7 +171,7 @@ impl CpuCommand { &self, device: &mut T, _request_arrival_time: Instant, - ) -> Result + ) -> Result where T: CpuManager, { @@ -172,17 +187,24 @@ impl CpuCommand { } else { None }; - Ok(RaplResponse { + // Wrap the RaplResponse in CpuResponse::Rapl + Ok(CpuResponse::Rapl(RaplResponse { cpu_energy_uj, dram_energy_uj, - }) + })) + } + Self::SupportsDramEnergy => { + // Wrap the DramAvailabilityResponse in CpuResponse::Dram + Ok(CpuResponse::Dram(DramAvailabilityResponse { + dram_available: device.is_dram_available(), + })) } - Self::StopMonitoring {} => { + Self::StopMonitoring => { device.stop_monitoring(); - Ok(RaplResponse { + Ok(CpuResponse::Rapl(RaplResponse { cpu_energy_uj: Some(0), dram_energy_uj: Some(0), - }) + })) } } } diff --git a/zeusd/src/routes/cpu.rs b/zeusd/src/routes/cpu.rs index 092c4c74..ab1dfa6c 100644 --- a/zeusd/src/routes/cpu.rs +++ b/zeusd/src/routes/cpu.rs @@ -24,7 +24,7 @@ impl From for CpuCommand { #[actix_web::post("/{cpu_id}/get_index_energy")] #[tracing::instrument( - skip(cpu_id, request, _device_tasks), + skip(request, _device_tasks), fields( cpu_id = %cpu_id, cpu = %request.cpu, @@ -48,6 +48,29 @@ async fn get_index_energy_handler( Ok(HttpResponse::Ok().json(measurement)) } +#[actix_web::get("/{cpu_id}/supports_dram_energy")] +#[tracing::instrument( + skip(_device_tasks), + fields( + cpu_id = %cpu_id, + ) +)] +async fn supports_dram_energy_handler( + cpu_id: web::Path, + _device_tasks: web::Data, +) -> Result { + let now = Instant::now(); + tracing::info!("Received request"); + let cpu_id = cpu_id.into_inner(); + + let answer = _device_tasks + .send_command_blocking(cpu_id, CpuCommand::SupportsDramEnergy, now) + .await?; + + Ok(HttpResponse::Ok().json(answer)) +} + pub fn cpu_routes(cfg: &mut web::ServiceConfig) { cfg.service(get_index_energy_handler); + cfg.service(supports_dram_energy_handler); } diff --git a/zeusd/tests/cpu.rs b/zeusd/tests/cpu.rs index 35c5a92b..49359e48 100644 --- a/zeusd/tests/cpu.rs +++ b/zeusd/tests/cpu.rs @@ -1,5 +1,6 @@ mod helpers; +use zeusd::devices::cpu::DramAvailabilityResponse; use zeusd::devices::cpu::RaplResponse; use zeusd::routes::cpu::GetIndexEnergy; @@ -154,3 +155,21 @@ async fn test_invalid_requests() { .expect("Failed to send request"); assert_eq!(resp.status(), 400); } + +#[tokio::test] +async fn test_supports_dram_energy() { + let app = TestApp::start().await; + let url = format!("http://127.0.0.1:{}/cpu/0/supports_dram_energy", app.port); + let client = reqwest::Client::new(); + + let resp = client + .get(url) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 200); + + let dram_response: DramAvailabilityResponse = serde_json::from_str(&resp.text().await.unwrap()) + .expect("Failed to deserialize response body"); + assert_eq!(dram_response.dram_available, true); +} diff --git a/zeusd/tests/helpers/mod.rs b/zeusd/tests/helpers/mod.rs index 610f1ca3..9a563ad3 100644 --- a/zeusd/tests/helpers/mod.rs +++ b/zeusd/tests/helpers/mod.rs @@ -266,7 +266,7 @@ impl_zeusd_request_cpu!(GetIndexEnergy); /// A test application that starts a server over TCP and provides helper methods /// for sending requests and fetching what happened to the fake GPUs. pub struct TestApp { - port: u16, + pub port: u16, observers: Vec, cpu_injectors: Vec, }