-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmlflow_efficiency_audit.py
149 lines (118 loc) · 5.17 KB
/
mlflow_efficiency_audit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from datetime import datetime
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Set MLflow Tracking URI from the .env file
tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
if not tracking_uri:
raise ValueError("MLFLOW_TRACKING_URI is not set in the .env file.")
mlflow.set_tracking_uri(tracking_uri)
client = MlflowClient()
def fetch_full_metric_history(run_id, metric_name):
"""
Fetch full history of a given metric for a run.
"""
try:
history = client.get_metric_history(run_id, metric_name)
return [(point.timestamp, point.value) for point in history]
except Exception:
return []
def calculate_gpu_utilization_and_history(run_id):
"""
Calculate GPU utilization history and average utilization for all GPUs in a run.
"""
gpu_histories = {}
gpu_utilization = []
gpu_utilization_per_gpu = {}
for i in range(12): # Assuming up to 12 GPUs
metric_name = f"system/gpu_{i}_utilization_percentage"
history = fetch_full_metric_history(run_id, metric_name)
if history:
# Store history as a readable string
history_str = "; ".join([f"Timestamp: {ts}, Value: {val}" for ts, val in history])
gpu_histories[f"GPU_{i}_History"] = history_str
# Calculate average utilization for this GPU
avg_utilization = sum([point[1] for point in history]) / len(history)
gpu_utilization.append(avg_utilization)
gpu_utilization_per_gpu[f"GPU_{i}_Average_Utilization (%)"] = avg_utilization
# Calculate overall average GPU utilization
overall_avg_utilization = sum(gpu_utilization) / len(gpu_utilization) if gpu_utilization else None
return gpu_histories, gpu_utilization_per_gpu, overall_avg_utilization
def fetch_all_experiment_metrics():
"""
Fetch detailed metrics, parameters, and metadata for all experiments.
"""
experiments = mlflow.search_experiments()
all_data = []
for experiment in experiments:
runs = client.search_runs(experiment_ids=[experiment.experiment_id])
for run in runs:
# Fetch GPU utilization history and averages
gpu_histories, gpu_utilization_per_gpu, overall_gpu_utilization = calculate_gpu_utilization_and_history(
run.info.run_id
)
# Check connection to Git, Dataset, or Versioned Environment
git_commit = run.data.tags.get("mlflow.source.git.commit", "No")
dataset = run.data.tags.get("mlflow.data.dataset", "No")
versioned_env = (
"Conda" if "mlflow.conda_env" in run.data.tags
else "Requirements" if "mlflow.requirements" in run.data.tags
else "Docker" if "mlflow.docker" in run.data.tags
else "No"
)
# Extract parameters and concatenate them into a single string
params = run.data.params
parameters_str = ", ".join([f"{key}: {value}" for key, value in params.items()])
# Prepare run data
run_data = {
"Experiment Name": experiment.name,
"Run ID": run.info.run_id,
"Run Name": run.info.run_name,
"User": run.data.tags.get("mlflow.user", "Unknown"),
"Source": run.data.tags.get("mlflow.source.name", "Unknown"),
"Status": run.info.status,
"Start Time": human_readable_date(run.info.start_time),
"End Time": human_readable_date(run.info.end_time),
"Duration (s)": (run.info.end_time - run.info.start_time) / 1000 if run.info.end_time else None,
"Git Commit": git_commit,
"Dataset": dataset,
"Versioned Environment": versioned_env,
"Parameters": parameters_str,
"Average GPU Utilization (%)": overall_gpu_utilization,
}
# Add GPU histories and per-GPU utilization
run_data.update(gpu_histories)
run_data.update(gpu_utilization_per_gpu)
all_data.append(run_data)
return pd.DataFrame(all_data)
def human_readable_date(timestamp):
"""
Convert timestamp to a human-readable format.
"""
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp / 1000).strftime("%d/%m/%Y %H:%M:%S")
def generate_excel(all_experiment_data, output_file="experiment_metrics_summary.xlsx"):
"""
Generate an Excel file with experiment metrics.
"""
wb = Workbook()
# Experiment Metrics Sheet
metrics_ws = wb.active
metrics_ws.title = "Experiment Metrics"
for row in dataframe_to_rows(all_experiment_data, index=False, header=True):
metrics_ws.append(row)
wb.save(output_file)
print(f"Saved metrics to {output_file}")
if __name__ == "__main__":
experiment_data = fetch_all_experiment_metrics()
if not experiment_data.empty:
generate_excel(experiment_data)
else:
print("No experiment data found.")