-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcastai_cluster_utils.py
111 lines (83 loc) · 3.49 KB
/
castai_cluster_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import csv
import json
import requests
from requests import RequestException
from datetime import datetime
def get_clusters(castai_api_key: str) -> dict:
url = f"https://api.cast.ai/v1/kubernetes/external-clusters"
headers = {
"accept": "application/json",
"X-API-Key": f"{castai_api_key}"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except RequestException as e:
print(f'An error has occurred while fetching data from the API: {e}')
else:
return json.loads(response.text)
def extract_cluster_ids(clstrs: dict) -> list[str]:
return [clstr['id'] for clstr in clstrs['items']]
def get_nodes(clstr_id: str, castai_api_key: str) -> dict:
url = f"https://api.cast.ai/v1/kubernetes/external-clusters/{clstr_id}/nodes"
headers = {
"accept": "application/json",
"X-API-Key": f"{castai_api_key}"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except RequestException as e:
print(f'An error has occurred while fetching data from the API: {e}')
else:
return json.loads(response.text)
def extract_instance_types(nodes: dict) -> list[str]:
return [node['instanceType'] for node in nodes['items']]
def build_cluster_dictionary(clstr_ids: list[str], castai_api_key) -> dict:
time = datetime.now()
cluster_instances = {clstr: extract_instance_types(get_nodes(clstr, castai_api_key)) for clstr in clstr_ids}
date_cluster_instances = {time: cluster_instances}
return date_cluster_instances
def display_most_used_instance_types(date_clstr_dict: dict) -> list:
results = []
for date, clstr_instances in date_clstr_dict.items():
print(f'Date: {date}')
print(f'Most used instance types per cluster:')
for clstr_id, instances in clstr_instances.items():
instance_type_counts = {}
# Count the instance types in the instances list
for instance_type in instances:
if instance_type not in instance_type_counts:
instance_type_counts[instance_type] = 0
instance_type_counts[instance_type] += 1
# Find the most used instance type(s)
max_count = max(instance_type_counts.values())
most_used_types = [instance_type for instance_type, count in instance_type_counts.items() if
count == max_count]
print(f'Cluster ID: {clstr_id}')
print(f'All instances: {instance_type_counts}')
print(f'Most used instance type(s): {most_used_types}')
print(f'Usage count: {max_count}')
results.append({
'date': date,
'cluster_id': clstr_id,
'instances': ','.join(instances),
'most_used_types': ','.join(most_used_types),
'usage_count': max_count
})
print('\n')
return results
def write_results_to_csv(results, csv_filename):
fieldnames = ['date', 'cluster_id', 'instances', 'most_used_types', 'usage_count']
file_exists = False
try:
with open(csv_filename, 'r') as csvfile:
file_exists = True
except FileNotFoundError:
pass
with open(csv_filename, 'a') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
for result in results:
writer.writerow(result)