-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDeviceHandler.py
79 lines (67 loc) · 3 KB
/
DeviceHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# device_handler.py
import asyncio
from pupil_labs.realtime_api import Device, StatusUpdateNotifier
from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
import time
# I ip 192.168.0.253
# E ip 192.168.0.13
class DeviceHandler:
def __init__(self, dev_info):
self.dev_info = dev_info
self.device = None
self.status = None
self.is_recording = False
self.record_button = None
self.connected = True # Add a connection status flag
self.name = None
async def init_device(self):
try:
self.device = Device.from_discovered_device(self.dev_info)
self.status = await self.device.get_status()
except ConnectionRefusedError:
print(f"Connection refused by device {self.dev_info['name']}")
self.connected = False # Update the connection status flag
self.status = 'DISCONNECTED'
async def get_info(self):
if self.connected:
name = self.status.phone.device_name
ip = self.status.phone.ip
battery = self.status.phone.battery_level
glass = self.status.hardware.glasses_serial
return [name, ip, battery, glass]
else:
return [f'{self.dev_info["name"]} (DISCONNECTED)', 'N/A', 'N/A', 'N/A']
async def start_recording(self):
if not self.connected:
print(f"Can't start recording, device {self.dev_info['name']} is disconnected")
return
# Create notifier to get update when recording is fully started
notifier = StatusUpdateNotifier(self.device, callbacks=[self.print_recording])
# Start receiving updates
await notifier.receive_updates_start()
# Start recording
recording_id = await self.device.recording_start()
print(f"Initiated recording with id {recording_id}")
async def stop_recording(self):
if not self.connected:
print(f"Can't stop recording, device {self.dev_info['name']} is disconnected")
return
await self.device.recording_stop_and_save()
await asyncio.sleep(2) # wait for confirmation via auto-update
async def send_message(self, message, u_time):
if not self.connected:
print(f"Can't send message, device {self.dev_info['name']} is disconnected")
return
# Send a message
try:
time_offset_estimator = TimeOffsetEstimator(self.status.phone.ip, self.status.phone.time_echo_port)
estimate = await time_offset_estimator.estimate()
u_time_offset = estimate.time_offset_ms.mean * 1000000 # Convert MS to NS
newtime = u_time - u_time_offset
await self.device.send_event(f'{message} o:{u_time_offset} t:{u_time}', event_timestamp_unix_ns=newtime)
# print(event)
except:
print(self.status.phone.device_name, ' Not found')
@staticmethod
def print_recording(status):
print("Recording: ", status.recording.rec_duration_ns)