-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservo_metrics.py
46 lines (40 loc) · 1.64 KB
/
servo_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import socket
import aioprometheus
import servo
from typing import List
# TODO: These are modeled as globals rather than instance attributes
# to deliver poor man's multiservo support. Needs a new design
service: aioprometheus.Service = aioprometheus.Service()
measure_counter: aioprometheus.Counter = aioprometheus.Counter(
"measurements", "Number of events.", const_labels={"host": socket.gethostname()}
)
adjust_counter: aioprometheus.Counter = aioprometheus.Counter(
"adjusts", "Number of events.", const_labels={"host": socket.gethostname()}
)
service.register(measure_counter)
service.register(adjust_counter)
@servo.metadata(
description="Tracks metrics for servo events.",
version="0.5.0",
homepage="https://github.com/opsani/servox",
license=servo.License.apache2,
maturity=servo.Maturity.experimental,
)
class MetricsConnector(servo.BaseConnector):
@servo.on_event()
async def startup(self) -> None:
await service.start(addr="127.0.0.1")
self.logger.info(f"serving prometheus metrics on: {service.metrics_url}")
@servo.on_event()
async def shutdown(self) -> None:
await service.stop()
@servo.after_event(servo.Events.measure)
async def after_measure(self, results: List[servo.EventResult], **kwargs) -> None:
self.logger.info("after_measure")
measure_counter.inc({"servo": self.optimizer.id})
@servo.after_event(servo.Events.adjust)
async def after_adjust(self, results: List[servo.EventResult]) -> None:
self.logger.info("after_adjust")
adjust_counter.inc({"servo": self.optimizer.id})
class Config:
arbitrary_types_allowed = True