-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaether-security.py
242 lines (194 loc) · 6.38 KB
/
aether-security.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# GrovePi
from grovepi import *
# Firebase wrapper
from pyrebase import pyrebase
# Password input
from getpass import getpass
# Firebase login error handling
from requests.exceptions import HTTPError
import json
# QR code
import qrcode
# OLED display
from luma.core.interface.serial import i2c
from luma.oled.device import sh1106
# Datetime for logging
from datetime import datetime
# WebRTC
import asyncio
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
import platform
from threading import Thread
# sleep
from time import sleep
# Firebase configuration
FIREBASE_CONFIG = {
"apiKey": "AIzaSyBb__72a_Y7ruiHQiG9gGK0qUmrbnESVvE",
"authDomain": "aether-iot.firebaseapp.com",
"databaseURL": "https://aether-iot.firebaseio.com",
"storageBucket": "aether-iot.appspot.com"
}
DEVICE = "pi-1"
SECURITY = "security"
SDP = "sdp"
WEB_DOMAIN = "aether-iot.web.app"
# Firebase initialization
firebase = pyrebase.initialize_app(FIREBASE_CONFIG)
auth = firebase.auth()
db = firebase.database()
# Peer connections for WebRTC
pcs = set()
# OLED panel initialization
serial = i2c(port=1, address=0x3C)
oled = sh1106(serial, rotate=1, height=128)
# GrovePi pin configuration
buzzer = 2
led = 3
lock = 4
# GrovePi pin initialization
pinMode(buzzer, "OUTPUT")
pinMode(led, "OUTPUT")
pinMode(lock, "OUTPUT")
# Firebase login + QR code generation
while True:
try:
print("=================================\n"
" Aether IoT | Sign in with email \n"
"=================================")
# email = input( "Email : ")
# password = getpass("Password: ")
print("Email : [email protected]")
print("Password: ")
email = password = "[email protected]"
user = auth.sign_in_with_email_and_password(email, password)
except HTTPError as error:
error_json = error.args[1]
error_msg = json.loads(error_json)["error"]["message"]
print("\n [!] %s\n" % error_msg.replace("_", " "))
else:
uid = user["localId"]
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_M,
box_size=4,
border=1
)
qr.add_data(WEB_DOMAIN + "/m/" + uid)
qr_img = qr.make_image().resize((128, 128)).convert("1")
print("\nSIGNED IN\n")
break
# log with datetime
def log(component, *args):
print(datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"[%s]" % component, " ".join(args))
# set alarm state
def setAlarm(on=False):
if on:
log("CTRL", "alarm on")
digitalWrite(buzzer, 1)
digitalWrite(led, 1)
else:
log("CTRL", "alarm off")
digitalWrite(buzzer, 0)
digitalWrite(led, 0)
# set lock state
def setLock(locked=True):
if locked:
log("CTRL", "door locked")
digitalWrite(lock, 0)
else:
log("CTRL", "door unlocked")
digitalWrite(lock, 1)
# set QR code display state
def displayQR(display=True):
if display:
log("MSG", "QR showing")
oled.display(qr_img)
else:
log("MSG", "QR hiding")
oled.clear()
# reply with answer to offerer in WebRTC
async def answer(sdp):
log("SDP", "offer received")
offer = RTCSessionDescription(sdp=sdp["sdp"], type=sdp["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
log("ICE", "%s" % pc.iceConnectionState)
if pc.iceConnectionState == "failed":
await pc.close()
pcs.discard(pc)
options = {"framerate": "5", "video_size": "640x480"}
if platform.system() == "Darwin":
player = MediaPlayer(
"default:none", format="avfoundation", options=options)
else:
player = MediaPlayer("/dev/video0", format="v4l2", options=options)
await pc.setRemoteDescription(offer)
for t in pc.getTransceivers():
if t.kind == "audio" and player.audio:
pc.addTrack(player.audio)
elif t.kind == "video" and player.video:
pc.addTrack(player.video)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
sdp_ref = db.child(uid).child(DEVICE).child(SECURITY).child(SDP)
sdp_ref.set({"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type}, user["idToken"])
log("SDP", "answer sent")
# close all peer connections in WebRTC
async def close_all_connections():
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
# run coroutine in daemon thread
def run_coro_in_thread(coro):
loop = asyncio.new_event_loop()
Thread(target=loop.run_forever, daemon=True).start()
asyncio.run_coroutine_threadsafe(coro, loop)
# handle Firebase security database stream
def stream_handler(message):
if message["event"] == "put":
path = message["path"]
data = message["data"]
if path == "/sdp":
if data["type"] == "offer":
run_coro_in_thread(answer(data))
elif path == "/control/alarm":
setAlarm(data)
elif path == "/control/lock":
setLock(data)
elif path == "/displayQR":
displayQR(data)
elif path == "/":
for key in data:
if key == "sdp" and data[key]["type"] == "offer":
run_coro_in_thread(answer(data["sdp"]))
elif key == "control":
for ckey in data[key]:
if ckey == "alarm":
setAlarm(data[key][ckey])
elif ckey == "lock":
setLock(data[key][ckey])
elif key == "displayQR":
displayQR(data[key])
# stream for Firebase security database changes
def security_db_stream():
security_ref = db.child(uid).child(DEVICE).child(SECURITY)
return security_ref.stream(stream_handler, user["idToken"])
# refresh login token every 59 minutes
def refresh_token():
while True:
sleep(59 * 60)
user = auth.refresh(user["refreshToken"])
if __name__ == "__main__":
try:
log("PROG", "program started")
s = security_db_stream()
refresh_token()
except KeyboardInterrupt:
asyncio.run(close_all_connections())
s.close()
log("PROG", "program exited")