-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmeasure
executable file
·289 lines (235 loc) · 10.4 KB
/
measure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#!/usr/bin/env python3
import sys
import os
import time
import requests
import yaml
from measure import Measure, ST_FAILED
DESC = "Prometheus measure driver for Optune by Opsani"
VERSION = "1.2.0"
HAS_CANCEL = True
PROGRESS_INTERVAL = 30
CONFIG_FPATH = os.getenv("CONFIG_FPATH", "./config.yaml")
PROM_BASE_URL = os.getenv("PROMETHEUS_ENDPOINT", "http://prometheus:9090")
PROM_API_PATH = "/api/v1/"
class Prom(Measure):
@staticmethod
def get_config(path=CONFIG_FPATH):
# read and parse config
# Load configuration from "prom" section of the config file, unless
# PROMETHEUS_USE_DRIVER_NAME environment variable is defined. Use config
# defined in section matching driver name in that case.
driver_key = "prom"
prometheus_use_driver_name = os.environ.get('PROMETHEUS_USE_DRIVER_NAME', False)
if prometheus_use_driver_name and str(prometheus_use_driver_name) == '1':
driver_key = os.path.basename(__file__)
try:
config = yaml.safe_load(open(path))[driver_key]
assert config and config.get("metrics"), 'No metrics has been provided at path "{}.metrics" in file located at {}"'.format(driver_key, path)
except yaml.YAMLError as e:
e.__cause__ = "Error parsing config file located at {}: {}".format(
path, e.__cause__
)
raise
except KeyError:
raise KeyError("No `{}` configuration has been provided in config file located at {}".format(driver_key, path))
# validate config
bad_top_keys = config.keys() - {"metrics", "prometheus_endpoint"}
if bad_top_keys:
raise Exception("Unknown top level key(s) in `{}` section of file {}: {}".format(driver_key, path, bad_top_keys))
# Validate prometheus_endpoint, if present
if "prometheus_endpoint" in config:
if not isinstance(config["prometheus_endpoint"], str):
raise Exception("'prometheus_endpoint' in '{}' section must be a string".format(driver_key))
if not config["prometheus_endpoint"]:
raise Exception("'prometheus_endpoint' in '{}' section must not be empty".format(driver_key))
metrics = config["metrics"]
assert isinstance(metrics, dict), "The `metrics` attribute in `{}` section must be a dict".format(driver_key)
for m, mdesc in config["metrics"].items():
assert isinstance(m, str), "Metric name `{}` must be a string, found {}".format(m, type(m).__name__)
if not isinstance(mdesc, dict):
raise Exception(
"Metric `{}` data in file {} must be a dict, found {}".format(
m, path, type(mdesc).__name__
)
)
bad_keys = mdesc.keys() - {"query", "unit", "period"}
if bad_keys:
raise Exception(
"Unknown attributes for metric `{}` in `{}` section of file {}: {}".format(driver_key, m, path,
bad_keys))
if "query" not in mdesc:
raise Exception(
"Missing `query` attribute for metric `{}` in `{}` section of file {}".format(
driver_key, m, path
)
)
if not isinstance(mdesc["query"], str):
raise Exception(
"Query for metric `{}` must be string, found {}".format(
m, type(mdesc["query"]).__name__
)
)
return config
def get_prom_metrics(self):
"""
Get metrics known to Prometheus's endpoint
(Currently not used)
"""
base_url = self.get_config().get("prometheus_endpoint", PROM_BASE_URL).rstrip("/")
url = "%s%slabel/__name__/values" % (base_url, PROM_API_PATH)
self.debug("Getting url: ", url)
r = requests.get(url)
assert r.status_code == 200, "Prometheus server returned http code: " + str(r.status_code)
try:
data = r.json()
except:
raise Exception("Failed to parse Prometheus JSON response")
self.debug("Got reponse data: ", data)
assert ("status" in data and data["status"] == "success"), "Prometheus server did not return status success"
assert "data" in data, "Prometheus server did not return data in output"
assert len(data["data"]) > 0, "Prometheus server returned no metrics"
known_metrics = data["data"]
assert isinstance(known_metrics, list)
# overwrites super
def describe(self):
cfg = self.get_config()
# construct response using configured metrics
metrics = dict()
for m, mdesc in cfg["metrics"].items():
d = dict()
if "unit" in mdesc:
d["unit"] = mdesc["unit"]
metrics[m] = d
return metrics
# overwrites super
def handle_cancel(self, signal, frame):
err = "Exiting due to signal: %s" % signal
self.print_measure_error(err, ST_FAILED)
sys.exit(3)
# overwrites super
def measure(self):
cfg = self.get_config()
# get Prometheus API URL with no trailing '/'
base_url = cfg.get("prometheus_endpoint", PROM_BASE_URL).rstrip("/")
# get metrics to be collected
try:
in_metrics = self.input_data["metrics"]
except:
raise Exception('Input configuration is missing "metrics"')
if not isinstance(in_metrics, list) or not in_metrics:
raise Exception(
"Invalid list of metrics in input configuration, must specify exactly one metric."
)
def gather_metrics(metric_names, start, end):
metrics = {}
for metric_name in metric_names:
# lookup metric query in configuration
try:
if cfg["metrics"].get(metric_name, None) is None:
# metric is missing in config, skipping due to the bug in OCO asking for derived metrics
continue
metric_query = cfg["metrics"][metric_name]["query"]
except:
raise Exception(
'Input configuration for metric "%s" has invalid query'
% (metric_name)
)
period = cfg["metrics"][metric_name].get("period", 60)
# Query metric to make sure it exists
try:
m_values = self._query_prom(base_url, metric_query, start, end, period)
self.debug(
"Initial value for metric %s: %s" % (metric_query, m_values)
)
except Exception as e:
raise Exception(
"Failed to query Prometheus for metric: %s. Error: %s"
% (metric_query, str(e))
)
metrics.update(
{metric_name: {"values": m_values, "annotation": metric_query, }}
)
return metrics
gather_metrics(in_metrics, time.time() - 600, time.time())
try:
warmup = int(self.input_data["control"]["warmup"])
except:
warmup = 0
try:
duration = int(self.input_data["control"]["duration"])
except:
raise Exception('Control configuration is missing "duration"')
delay = int(self.input_data["control"].get("delay", 0))
start = time.time() + warmup
# sleep
self.t_sleep = warmup + duration + delay
self.debug("Sleeping for %d seconds (%d warmup + %d duration)"
% (self.t_sleep, warmup, duration)
)
time.sleep(self.t_sleep)
metrics = gather_metrics(in_metrics, start, start + duration)
annotations = {
# 'prometheus_base_url': base_url,
}
return (metrics, annotations)
# Overwrite so it updates progress before printing it
def print_progress(self, message=None, msg_index=None, stage=None, stageprogress=None):
# Update progress based on how much time has elapsed
t_taken = time.time() - self.t_measure_start
self.progress = int(100 * ((t_taken) / self.t_sleep))
super().print_progress(message, msg_index, stage, stageprogress)
# helper
def _query_prom(self, base_url, metric_query, start, end, period=60):
url = "%s%squery_range?query=%s&start=%s&end=%s&step=%i" % (
base_url,
PROM_API_PATH,
metric_query,
start,
end,
period
)
self.debug("Getting url: ", url)
r = requests.get(url)
assert r.status_code == 200, "Prometheus server returned http code: " + str(
r.status_code
)
try:
data = r.json()
except:
raise Exception("Failed to parse Prometheus JSON response")
self.debug("Got response data: ", data)
assert ("status" in data and data["status"] == "success"), "Prometheus server did not return status success"
insts = []
for i in data["data"]["result"]:
metric = i["metric"].copy()
if "__name__" in metric:
del metric["__name__"]
metric_id = " ".join(
map(lambda m: ":".join(m), sorted(metric.items(), key=lambda m: m[0]))
)
# values = map(
# lambda v: (v[0], float(v[1]) if "." in v[1] else int(v[1])), i["values"]
# )
values = []
for item in i["values"]:
try:
if "." in item[1]:
d = float(item[1])
elif item[1] == 'NaN':
continue
else:
d = int(item[1])
except ValueError:
continue
values.append((item[0], d))
insts.append(dict(id=metric_id, data=list(values)))
if not insts:
return []
# raise Exception(
# "Failed to find metrics data in Prometheus response " + str(data)
# )
return insts
if __name__ == "__main__":
prom = Prom(VERSION, DESC, HAS_CANCEL, PROGRESS_INTERVAL)
prom.run()