-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathEdgeImpulse.py
136 lines (122 loc) · 4.96 KB
/
EdgeImpulse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# EIAPI Class
# uploader = EIAPI(hmac, api, ssid, password)
# uplaoder.sendValues([[15], [18], [123], 10000)
# uploader.sendValues([[15, 18, 123], [16, 19, 124], [17, 20, 125] ], 10)
# one of the assumptions is that the class is handling the network connectivity
# this works in my use case but I'm open to devising an alternate constructor
# or similar
# This depends on hmac being available ( typically as /lib/hmac.mpy on the device )
# see https://github.com/cameronbunce/ESP32-Edge-Impulse/blob/main/README.md for details
# Public methods
# EIAPI.sendValues(values, interval_ms) to upload one sensor's data
# EIAPI.offline() to return radio to a lower power state
# EIAPI.getMessage() to determine the failure mode, will contain all error messages
# since last call of getMessage(),
# such as "SSID Not Found"
# or non-http200 responses from upload attempts
# Newest will be on top
# Messages are cleared once returned with getMessage(),
# EIAPI.readMessage() does not clear, but returns the same as above
# EIAPI.online() is not strictly necessary, as sendValues calls it
# To Do
# - create a generic sendValues that takes the sensor value:
# "sensors": [
# { "name": "Temperature", "units": "Celsius" }
# ],
# as input
# - write messages to file for persistence
import network
class EIAPI:
def __init__(self, hmac_key, api_key, ssid, password):
import ubinascii, os
self.wlan = network.WLAN(network.STA_IF)
self.hmac = hmac_key
self.api = api_key
self.ssid = ssid
self.password = password
if 'message.txt' not in os.listdir('/'):
self.message = []
else:
with open('mesage.txt') as filein:
self.message = filein.readlines()
self.mac = ubinascii.hexlify(self.wlan.config('mac')).decode()
def getMessage(self):
messageOut = self.message
self.message = []
return messageOut
def readMessage(self):
return self.message
def __addMessage(self, update):
self.message.append(update) # add new message to tail, store for reference
with open('message.txt', 'w') as fileout:
fileout.writelines(self.message)
def online(self):
# get online if not online, return false if we can't find our SSID
if self.wlan.isconnected():
return True
else:
ssid_list = []
self.wlan.active(True)
rawList = self.wlan.scan()
for one in rawList:
ssid_list.append(one[0].decode())
if not self.ssid in ssid_list:
self.__addMessage("SSID Not Found")
return False
else:
self.wlan.connect(self.ssid, self.password)
while not self.wlan.isconnected():
pass
return True
def offline(self):
# return the radio to a lower power state
self.wlan.active(False)
def __packandship(self, values, interval_ms):
import time, json, hmac, urequests, hashlib
# empty signature (all zeros). HS256 gives 32 byte signature, and we encode in hex, so we need 64 characters here
emptySignature = ''.join(['0'] * 64)
data = {
"protected": {
"ver": "v1",
"alg": "HS256",
"iat": time.time() # epoch time, seconds since poweron
},
"signature": emptySignature,
"payload": {
"device_name": self.mac,
"device_type": "ESP32-DS18B20",
"interval_ms": interval_ms,
"sensors": [
{ "name": "Temperature", "units": "Celsius" }
],
"values": values
}
}
encoded = json.dumps(data)
# create signature based on blank signature field
signature = hmac.new(bytes(self.hmac, 'utf-8'), msg = encoded.encode('utf-8'), digestmod = hashlib.sha256).hexdigest()
# update signature field to sign the message
data['signature'] = signature
res = urequests.post(url='https://ingestion.edgeimpulse.com/api/training/data',
data=encoded,
headers={
'Content-Type': 'application/json',
'x-file-name': 'DS18B20Long',
'x-api-key': self.api
})
if (res.status_code == 200):
return True
else:
self.__addMessage(res.content)
return False
def sendValues(self, values, interval_ms):
if self.online():
# We're online, send the values
if self.__packandship(values, interval_ms):
return True
else:
return False
else:
# Not online
self.__addMessage("Not Online")
return False