-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathheartbeat.py
114 lines (94 loc) · 3.87 KB
/
heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import re
import time
import asyncio
import sys
import psutil
from events import Events
class Heartbeat:
def __init__(self, config, servoController, motorController, alsa, lightsController, powerPlant):
driverConfig = config["DRIVER"]
self.heartbeatInterval = float(driverConfig["MaxHeartbeatInvervalS"])
self.servoController = servoController
self.motorController = motorController
self.alsa = alsa
self.ssidRegex = re.compile(r"ESSID:\"(.+?)\"")
self.qualityRegex = re.compile(r"Link Quality=([^ ]+)")
self.signalRegex = re.compile(r"Signal level=(.*? dBm)")
self.lastHeartbeat = time.time()
self.task = None
self.lightsController = lightsController
self.powerPlant = powerPlant
self.resetHeartbeatData()
loop = asyncio.get_event_loop()
self.task = loop.create_task(self.heartbeatLoop())
lastHeartbeat = -1
heartbeatStop = False
def resetHeartbeatData(self):
self.lastHeartbeatData = {
"SSID": "-",
"Quality": "-",
"Signal": "-",
"Volume": 0,
"CPU": "-",
"Lights": False,
"BatteryPercent": 0,
"BatteryCharging": False,
"InvalidState": True,
}
async def heartbeatLoop(self):
print("Heartbeat starting...")
try:
while True:
if (time.time() - self.lastHeartbeat) > self.heartbeatInterval:
if not self.heartbeatStop:
self.motorController.setBearing("0", False)
await self.servoController.lookStop()
self.heartbeatStop = True
else:
self.heartbeatStop = False
newHeartbeatData = await self.collectHeartbeatData()
if newHeartbeatData["BatteryCharging"] != self.lastHeartbeatData["BatteryCharging"]:
if newHeartbeatData["BatteryCharging"]:
Events.getInstance().fireOnCharger()
else:
Events.getInstance().fireOffCharger()
self.lastHeartbeatData = newHeartbeatData
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print("Heartbeat stopped")
except Exception as e:
print("Unexpected exception in heartbeat: " + str(e))
async def collectHeartbeatData(self):
try:
proc = await asyncio.create_subprocess_shell(
"iwconfig wlan0",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
wifiInfo = stdout.decode()
ssidMatch = self.ssidRegex.search(wifiInfo)
ssid = ssidMatch.group(1) if ssidMatch else "-"
qualityMatch = self.qualityRegex.search(wifiInfo)
quality = qualityMatch.group(1) if qualityMatch else "-"
signalMatch = self.signalRegex.search(wifiInfo)
signal = signalMatch.group(1) if signalMatch else "-"
volume = int(self.alsa.getVolume())
cpuIdle = psutil.cpu_percent()
batteryInfo = self.powerPlant.getBatteryInfo()
return {
"SSID": ssid,
"Quality": quality,
"Signal": signal,
"Volume": volume,
"CPU": cpuIdle,
"Lights": self.lightsController.lightsStatus,
"BatteryPercent": batteryInfo[0],
"BatteryCharging": batteryInfo[1],
}
except Exception as ex:
print(str(ex), file=sys.stderr)
self.resetHeartbeatData()
return self.lastHeartbeatData
def onHeartbeatReceived(self):
self.lastHeartbeat = time.time()
return self.lastHeartbeatData