-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathmqtt_test.py
executable file
·193 lines (159 loc) · 6.68 KB
/
mqtt_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/python
#
# Copyright (c) 2020-2023 Rich Bell <[email protected]>
#
# See the file LICENSE.txt for your full rights.
#
""" A simple utility that prints the topics and payloads. """
from __future__ import print_function
import argparse
import os
import random
import time
import configobj
import paho.mqtt.client as mqtt
USAGE = """mqtt_test --help
mqtt_test [CONFIG_FILE]
[--type=[driver|service]]
[--records=MAX_RECORDS]
[--host=HOST]
[--port=PORT]
[--keepalive=KEEPALIVE]
[--clientid=CLIENTID]
[--username=USERNAME]
[--password=PASSWORD]
[--topics=TOPIC1,TOPIC2]
[--quiet]
A simple utility that prints the topics and payloads.
Configuration can be read from a 'weewx.conf' file or passed in.
Command line options override any options in the file.
"""
def on_log(_client, _userdata, level, msg):
""" MQTT logging callback. """
log_level = {
mqtt.MQTT_LOG_INFO: 'MQTT_LOG_INFO',
mqtt.MQTT_LOG_NOTICE: 'MQTT_LOG_NOTICE',
mqtt.MQTT_LOG_WARNING: 'MQTT_LOG_WARNING',
mqtt.MQTT_LOG_ERR: 'MQTT_LOG_ERR',
mqtt.MQTT_LOG_DEBUG: 'MQTT_LOG_DEBUG'
}
print(f"{log_level[level]}: {msg}")
def on_connect(client, userdata, _flags, rc):
""" MQTT on connect callback. """
print(f"Connected with result code {int(rc)}")
for topic in userdata['topics']:
client.subscribe(topic, qos=userdata['qos'])
def on_disconnect(_client, _userdata, rc):
""" MQTT on disconnect callback. """
print(f"Disconnected with result code {int(rc)}")
def on_subscribe(_client, _userdata, mid, granted_qos):
""" MQTT on subscribe callback. """
print(f"Subscribed to mid: {int(mid)} is size {len(granted_qos)} has a QOS of {int(granted_qos[0])}")
def on_message(client, userdata, msg):
""" MQTT on message callback. """
print(f'({int(time.time())}) mid:{msg.mid}, qos:{msg.qos}, {msg.topic}: {msg.payload}')
if userdata.get('max_records'):
userdata['counter'] += 1
if userdata['counter'] >= userdata['max_records']:
client.disconnect()
def init_parser():
""" Parse the command line arguments. """
parser = argparse.ArgumentParser(usage=USAGE)
parser.add_argument("--type", choices=["driver", "service"],
help="This contols what configuration section, [MQTTSubscribeDriver] or [MQTTSubscribeDriver], is read. ",
default="driver")
parser.add_argument('--records', dest='max_records', type=int,
help='The number of MQTT records to retrieve.')
parser.add_argument("--host",
help="The MQTT server.")
parser.add_argument('--port', dest='port', type=int,
help='The port to connect to.')
parser.add_argument('--keepalive', dest='keepalive', type=int,
help='Maximum period in seconds allowed between communications with the broker.')
parser.add_argument("--clientid",
help="The clientid to connect with.")
parser.add_argument("--persist", action="store_true", dest="persist",
help="Set up a persistence session (clean_session=false)")
parser.add_argument("--username",
help="username for broker authentication.")
parser.add_argument("--password",
help="password for broker authentication.")
parser.add_argument("--qos", default=0, type=int,
help="QOS desired. Currently one specified for all topics")
parser.add_argument("--topics",
help="Comma separated list of topics to subscribe to.")
parser.add_argument("--quiet", action="store_true", dest="quiet",
help="Turn off the MQTT logging.")
parser.add_argument("config_file", nargs="?")
return parser
def _get_option(option, default):
if option:
return option
return default
def main():
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
""" The main entry point. """
parser = init_parser()
options = parser.parse_args()
if options.type == 'service':
config_type = 'MQTTSubscribeService'
else:
config_type = 'MQTTSubscribeDriver'
max_records = _get_option(options.max_records, None)
if options.config_file:
config_path = os.path.abspath(options.config_file)
configuration = configobj.ConfigObj(config_path, file_error=True)
config_dict = configuration.get(config_type, {})
print(f"Reading configuration file, {config_path}.")
print(f"Using section [{config_type}] of the configuration file.")
else:
config_dict = {}
host = _get_option(options.host, config_dict.get('host', 'localhost'))
port = _get_option(options.port, int(config_dict.get('port', 1883)))
keepalive = _get_option(options.keepalive, int(config_dict.get('keepalive', 60)))
clientid = _get_option(options.clientid, config_dict.get('clientid', config_type + '-' + str(random.randint(1000, 9999))))
username = _get_option(options.username, config_dict.get('username', None))
# todo cleanup, so that not always overriding config
clean_session = not options.persist
qos = options.qos
password = _get_option(options.password, config_dict.get('password', None))
topics = []
if 'topic' in config_dict:
topics.append(config_dict['topic'])
else:
if 'topics' in config_dict:
for topic in config_dict['topics']:
topics.append(topic)
if options.topics:
topics = options.topics.split(',')
print(f"Host is {host}")
print(f"Port is {port}")
print(f"Keep alive is {keepalive}")
print(f"Client id is {clientid}")
print(f"Clean session is {clean_session}")
print(f"Username is {username}")
print(f"Password is {password}")
print(f"QOS is {qos}")
print(f"Topics are {topics}")
if password is not None:
print("Password is set")
else:
print("Password is not set")
userdata = {}
userdata['qos'] = qos
userdata['topics'] = topics
if max_records:
userdata['counter'] = 0
userdata['max_records'] = max_records
client = mqtt.Client(client_id=clientid, userdata=userdata, clean_session=clean_session)
if not options.quiet:
client.on_log = on_log
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.on_message = on_message
if username is not None and password is not None:
client.username_pw_set(username, password)
client.connect(host, port, keepalive)
client.loop_forever()
main()