-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparking.py
233 lines (170 loc) · 9.02 KB
/
parking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import base64
import hashlib
import hmac
import platform
import sys
import os
import threading
import time
import requests
import yaml
import socketio
import argparse
IS_RASPBERRY = (platform.machine().startswith("arm") or platform.machine().startswith("aarch")) and ("raspberrypi" in os.uname().nodename or "rpi6" in os.uname().nodename)
DEFAULT_HOST = "hackeps.ddns.net/backend"
DEFAULT_HTTPS = True
if IS_RASPBERRY:
import RPi.GPIO as GPIO #pip install python3-rpi.gpio
else:
import keyboard #pip install keyboard
def generate_hmac_signature(client_id, secret_key:str, timestamp, data):
message = f"{client_id}{timestamp}{data}"
signature = hmac.new(secret_key.encode(), message.encode(), hashlib.sha256).digest()
signature_base64 = base64.b64encode(signature).decode()
return signature_base64
def generate_auth_header(client_id, secret_key, data):
timestamp = int(time.time())
signature = generate_hmac_signature(client_id, secret_key, timestamp, data)
return f"hmac {client_id}:{signature}:{timestamp}"
class ParkingSocket:
def __init__(self, parkingId:str, secret, host=DEFAULT_HOST, httpsMode=DEFAULT_HOST, retry_timeout=10):
self.sio = socketio.Client()
self.parkingId = parkingId
self.secret = secret
self.host = f"ws{'s' if httpsMode else ''}://{host}/socket"
self.register_events()
self.retry_timeout = retry_timeout
self.running = True
def register_events(self):
@self.sio.on('connect', namespace='/socket')
def connect():
print("Conexión establecida con el servidor en /socket")
self.sio.emit("status_parking", {"status": "online", "Authorization": generate_auth_header(self.parkingId, self.secret, "status_parking")}, namespace="/socket")
@self.sio.on('disconnect', namespace='/socket')
def disconnect():
print("Desconectado del servidor")
self.sio.emit("status_parking", {"status": "offline", "parkingId": self.parkingId, "Authorization": generate_auth_header(self.parkingId, self.secret, "status_parking")}, namespace="/socket")
@self.sio.on('connect_error', namespace='/socket')
def connect_error(data):
print("Error al conectar:", data)
@self.sio.on('error', namespace='/socket')
def error(data):
print("Error:", data)
def connect(self):
while self.running:
try:
print("Intentando conectar a", self.host)
self.sio.connect(self.host, transports=["websocket"])
self.sio.wait()
except Exception as e:
print("Error al intentar conectar:", str(e))
if not self.running:
break
print("Intentando reconectar en", self.retry_timeout, "segundos")
time.sleep(self.retry_timeout)
self.sio.disconnect()
print("Cliente Socket.IO detenido")
def stop(self):
self.running = False
def emitChange(self, occupation):
self.sio.emit("change_parking", {"parkingId": self.parkingId, "occupation": occupation, "Authorization": generate_auth_header(self.parkingId, self.secret, "change_parking")}, namespace="/socket")
class Parking:
def __init__(self, confPath="conf.yaml", BEAM_IN_PIN=17, BEAM_OUT_PIN=27, SIMULATED_IN_KEY="space", SIMULATED_OUT_KEY="enter", retry_timeout=10):
self.confPath = confPath
self.conf = {}
self.BEAM_IN_PIN = BEAM_IN_PIN
self.BEAM_OUT_PIN = BEAM_OUT_PIN
self.SIMULATED_IN_KEY = SIMULATED_IN_KEY
self.SIMULATED_OUT_KEY = SIMULATED_OUT_KEY
self.retry_timeout = retry_timeout
self.beam_status = False
self.readConf()
self.parking = self.getParking()
self.conf["size"] = self.parking.get("size")
self.writeConf()
self.socket = ParkingSocket(self.conf.get('parkingId'), self.conf.get('secret'), self.conf.get("host", DEFAULT_HOST), self.conf.get("https", DEFAULT_HTTPS), retry_timeout=retry_timeout)
threading.Thread(target=self.socket.connect).start()
def readConf(self):
with open(self.confPath, "r") as f:
self.conf = yaml.safe_load(f)
self.conf["occupation"] = self.conf.get("occupation") or 0
def writeConf(self):
with open(self.confPath, "w") as f:
yaml.dump(self.conf, f)
def getParking(self):
parkingId = self.conf.get("parkingId")
host = self.conf.get("host", DEFAULT_HOST)
httpsMode = self.conf.get("https", DEFAULT_HTTPS)
url = f"http{'s' if httpsMode else ''}://{host}/api/v1/parking/{parkingId}"
while True:
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting parking data: {response.status_code}:\n{response.text}")
except KeyboardInterrupt:
print("\nExiting program.")
sys.exit(0)
except Exception as e:
print(f"Error getting parking data: {str(e)}")
print(f"Trying again in {self.retry_timeout} seconds")
time.sleep(self.retry_timeout)
def beam_broken(self):
if IS_RASPBERRY:
input = GPIO.input(self.BEAM_IN_PIN) == 0
output = GPIO.input(self.BEAM_OUT_PIN) == 0
else:
input = keyboard.is_pressed(self.SIMULATED_IN_KEY)
output = keyboard.is_pressed(self.SIMULATED_OUT_KEY)
return input or output, input
def break_beam_callback(self):
_beam_status, enterMode = self.beam_broken()
if _beam_status != self.beam_status:
if _beam_status:
print("Beam broken")
occupation = self.conf.get("occupation")
occupation += 1 if enterMode else -1
if occupation < 0: occupation = 0
elif occupation > self.conf.get("size"): occupation = self.conf.get("size")
self.conf["occupation"] = occupation
self.writeConf()
print(f"Occupation: {occupation}/{self.conf.get('size')}")
self.socket.emitChange(occupation)
else:
print("Beam unbroken")
self.beam_status = _beam_status
def start(self):
try:
if IS_RASPBERRY:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.BEAM_IN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(self.BEAM_OUT_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(self.BEAM_IN_PIN, GPIO.BOTH, callback=lambda x: self.break_beam_callback())
GPIO.add_event_detect(self.BEAM_OUT_PIN, GPIO.BOTH, callback=lambda x: self.break_beam_callback())
print("Running on Raspberry Pi. Press Ctrl+C to exit.")
else:
print(f"Running on PC. Press '{self.SIMULATED_IN_KEY}' to simulate sensor break in. Press '{self.SIMULATED_OUT_KEY}' to simulate sensor break out. Press Ctrl+C to exit.")
# Bucle principal
while True:
if not IS_RASPBERRY:
self.break_beam_callback()
pass
except KeyboardInterrupt:
print("\nExiting program.")
finally:
if IS_RASPBERRY:
GPIO.cleanup()
else:
print("Program ended.")
self.socket.stop()
parser = argparse.ArgumentParser(description="Parking Module")
parser.add_argument('--conf', type=str, required=False, default='conf.yaml', help="Archivo de configuración.")
parser.add_argument('--beam-in-pin', type=int, required=False, default=17, help="Pin del sensor de entrada.")
parser.add_argument('--beam-out-pin', type=int, required=False, default=27, help="Pin del sensor de salida.")
parser.add_argument('--simulated-in-key', type=str, required=False, default='space', help="Tecla para simular sensor de entrada.")
parser.add_argument('--simulated-out-key', type=str, required=False, default='enter', help="Tecla para simular sensor de salida.")
parser.add_argument('--retry-timeout', type=int, required=False, default=10, help="Tiempo de espera entre reintentos.")
args = parser.parse_args()
parking = Parking(confPath=args.conf, BEAM_IN_PIN=args.beam_in_pin, BEAM_OUT_PIN=args.beam_out_pin, SIMULATED_IN_KEY=args.simulated_in_key, SIMULATED_OUT_KEY=args.simulated_out_key, retry_timeout=args.retry_timeout)
parking.start()