-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathadafruit_aws_iot.py
318 lines (252 loc) · 11 KB
/
adafruit_aws_iot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# SPDX-FileCopyrightText: 2019 Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_aws_iot`
================================================================================
Amazon AWS IoT MQTT Client for CircuitPython
* Author(s): Brent Rubell
Implementation Notes
--------------------
**Hardware:**
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
import json
from adafruit_minimqtt.adafruit_minimqtt import MMQTTException
try:
from typing import Optional, Type, Union
from types import TracebackType
from adafruit_minimqtt.adafruit_minimqtt import MQTT
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_AWS_IOT.git"
class AWS_IOT_ERROR(Exception):
"""Exception raised on MQTT API return-code errors."""
# pylint: disable=unnecessary-pass
pass
class MQTT_CLIENT:
"""Client for interacting with Amazon AWS IoT MQTT API.
:param ~MQTT.MQTT mmqttclient: Pre-configured MiniMQTT Client object.
:param int keep_alive: Optional Keep-alive timer interval, in seconds.
Provided interval must be 30 <= keep_alive <= 1200.
"""
def __init__(self, mmqttclient: MQTT, keep_alive: int = 30) -> None:
if "MQTT" in str(type(mmqttclient)):
self.client = mmqttclient
else:
raise TypeError(
"This class requires a preconfigured MiniMQTT object, \
please create one."
)
# Verify MiniMQTT client object configuration
try:
self.cid = self.client.client_id
assert (
self.cid[0] != "$"
), "Client ID can not start with restricted client ID prefix $."
except Exception as ex:
raise TypeError(
"You must provide MiniMQTT with your AWS IoT Device's Identifier \
as the Client ID."
) from ex
# Shadow-interaction topic
self.shadow_topic = "$aws/things/{}/shadow".format(self.cid)
# keep_alive timer must be between 30 <= keep alive interval <= 1200 seconds
# https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html
assert (
30 <= keep_alive <= 1200
), "Keep_Alive timer \
interval must be between 30 and 1200 seconds"
self.keep_alive = keep_alive
# User-defined MQTT callback methods must be init'd to None
self.on_connect = None
self.on_disconnect = None
self.on_message = None
self.on_subscribe = None
self.on_unsubscribe = None
# Connect MiniMQTT callback handlers
self.client.on_connect = self._on_connect_mqtt
self.client.on_disconnect = self._on_disconnect_mqtt
self.client.on_message = self._on_message_mqtt
self.client.on_subscribe = self._on_subscribe_mqtt
self.client.on_unsubscribe = self._on_unsubscribe_mqtt
self.connected_to_aws = False
def __enter__(self) -> "MQTT_CLIENT":
return self
def __exit__(
self,
exception_type: Optional[Type[type]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.disconnect()
@property
def is_connected(self) -> bool:
"""Returns if MQTT_CLIENT is connected to AWS IoT MQTT Broker"""
return self.connected_to_aws
def disconnect(self) -> None:
"""Disconnects from Amazon AWS IoT MQTT Broker and de-initializes the MiniMQTT Client."""
try:
self.client.disconnect()
except MMQTTException as error:
raise AWS_IOT_ERROR("Error disconnecting with AWS IoT: ", error) from error
self.connected_to_aws = False
# Reset user-defined callback methods to None
self.on_connect = None
self.on_disconnect = None
self.on_message = None
self.on_subscribe = None
self.on_unsubscribe = None
self.client.deinit()
def reconnect(self) -> None:
"""Reconnects to the AWS IoT MQTT Broker"""
try:
self.client.reconnect()
except MMQTTException as error:
raise AWS_IOT_ERROR("Error re-connecting to AWS IoT:", error) from error
def connect(self, clean_session: bool = True) -> None:
"""Connects to Amazon AWS IoT MQTT Broker with Client ID.
:param bool clean_session: Establishes a clean session with AWS broker.
"""
try:
self.client.connect(clean_session)
except MMQTTException as error:
raise AWS_IOT_ERROR("Error connecting to AWS IoT: ", error) from error
self.connected_to_aws = True
# MiniMQTT Callback Handlers
# pylint: disable=not-callable, unused-argument
def _on_connect_mqtt(
self, client: MQTT, userdata: str, flag: int, ret_code: int
) -> None:
"""Runs when code calls on_connect.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param int flag: QoS flag from broker.
:param int ret_code: Return code from broker.
"""
self.connected_to_aws = True
# Call the on_connect callback if defined in code
if self.on_connect is not None:
self.on_connect(self, userdata, flag, ret_code)
# pylint: disable=not-callable, unused-argument
def _on_disconnect_mqtt(
self, client: MQTT, userdata: str, flag: int, ret_code: int
) -> None:
"""Runs when code calls on_disconnect.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param int flag: QoS flag from broker.
:param int ret_code: Return code from broker.
"""
self.connected_to_aws = False
# Call the on_connect callback if defined in code
if self.on_connect is not None:
self.on_connect(self, userdata, flag, ret_code)
# pylint: disable=not-callable
def _on_message_mqtt(self, client: MQTT, topic: str, payload: str) -> None:
"""Runs when the client calls on_message.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str topic: MQTT broker topic.
:param str payload: Payload returned by MQTT broker topic
"""
if self.on_message is not None:
self.on_message(self, topic, payload)
# pylint: disable=not-callable
def _on_subscribe_mqtt(
self, client: MQTT, user_data: str, topic: int, qos: int
) -> None:
"""Runs when the client calls on_subscribe.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param str topic: Desired MQTT topic.
:param int qos: Quality of service level for topic, from broker.
"""
if self.on_subscribe is not None:
self.on_subscribe(self, user_data, topic, qos)
# pylint: disable=not-callable
def _on_unsubscribe_mqtt(
self, client: MQTT, user_data: str, topic: str, pid: int
) -> None:
"""Runs when the client calls on_unsubscribe.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param str topic: Desired MQTT topic.
:param int pid: Process ID.
"""
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, user_data, topic, pid)
# MiniMQTT Network Control Flow
def loop(self, timeout: float = 0) -> None:
"""Starts a synchronous message loop which maintains connection with AWS IoT.
Must be called within the keep_alive timeout specified to init.
This method does not handle network connection/disconnection.
:param float timeout: client return after this timeout, in seconds.
Example of "pumping" an AWS IoT message loop:
..code-block::python
while True:
aws_iot.loop()
"""
if self.connected_to_aws:
self.client.loop(timeout)
@staticmethod
def validate_topic(topic: str) -> None:
"""Validates if user-provided pub/sub topics adhere to AWS Service Limits.
https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html
:param str topic: Desired topic to validate
"""
assert hasattr(topic, "split"), "Topic must be a string"
assert len(topic) < 256, "Topic must be less than 256 bytes!"
assert len(topic.split("/")) <= 9, "Topics are limited to 7 forward slashes."
# MiniMQTT Pub/Sub Methods, for usage with AWS IoT
def subscribe(self, topic: str, qos: int = 1) -> None:
"""Subscribes to an AWS IoT Topic.
:param str topic: MQTT topic to subscribe to.
:param int qos: Desired topic subscription's quality-of-service.
"""
assert qos < 2, "AWS IoT does not support subscribing with QoS 2."
self.validate_topic(topic)
self.client.subscribe(topic, qos)
def publish(
self, topic: str, payload: Union[str, float, bytes], qos: int = 1
) -> None:
"""Publishes to a AWS IoT Topic.
:param str topic: MQTT topic to publish to.
:param payload: Data to publish to topic. Must be able to be converted
to a string using ``str()``
:type payload: str|float|bytes
:param int qos: Quality of service level for publishing
"""
assert qos < 2, "AWS IoT does not support publishing with QoS 2."
self.validate_topic(topic)
if isinstance(payload, int or float):
payload = str(payload)
self.client.publish(topic, payload, qos=qos)
# AWS IoT Device Shadow Service
def shadow_get_subscribe(self, qos: int = 1) -> None:
"""Subscribes to device's shadow get response.
:param int qos: Optional quality of service level.
"""
self.client.subscribe(self.shadow_topic + "/get/#", qos)
def shadow_subscribe(self, qos: int = 1) -> None:
"""Subscribes to all notifications on the device's shadow update topic.
:param int qos: Optional quality of service level.
"""
self.client.subscribe(self.shadow_topic + "/update/#", qos)
def shadow_update(self, document: str):
"""Publishes a request state document to update the device's shadow.
:param str state_document: JSON-formatted state document string.
"""
self.client.publish(self.shadow_topic + "/update", document)
def shadow_get(self) -> None:
"""Publishes an empty message to shadow get topic to get the device's shadow."""
self.client.publish(
self.shadow_topic + "/get", json.dumps({"message": "ignore"})
)
def shadow_delete(self) -> None:
"""Publishes an empty message to the shadow delete topic to delete a device's shadow"""
self.client.publish(
self.shadow_topic + "/delete", json.dumps({"message": "delete"})
)