import paho.mqtt.client as mqttlib import ssl import logging #import json #import time class mqtt: def __init__(self, broker_endpoint, port, client_id=None) -> None: self.broker_endpoint = broker_endpoint self.port = port self.client_id = client_id or None self.mqtt_client = mqttlib.Client(mqttlib.CallbackAPIVersion.VERSION2,self.client_id, protocol=mqttlib.MQTTv5,) def set_log_level(self, level): log = None if str.lower(level) == "debug": log = logging.basicConfig(level=logging.DEBUG) self.log_level = level elif str.lower(level) == "info": log = logging.basicConfig(level=logging.INFO) self.log_level = level elif str.lower(level) == "notset": log = logging.basicConfig(level=logging.NOTSET) self.log_level = level logger= logging.getLogger() file_handler = logging.FileHandler('mqtt_logs.log') file_handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) self.mqtt_client.enable_logger() def set_sslContext(self): self.context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile="C://Temp//testmqtt//CA1.pem") return self.context def set_tls(self, CA, Cert, keyf, tls_protocol): self.CA = CA self.Cert = Cert self.keyf = keyf if str.lower(tls_protocol) == "client": #self.tls_protocol = ssl.PROTOCOL_TLS_CLIENT self.tls_protocol = ssl.PROTOCOL_TLSv1_2 elif str.lower(tls_protocol) == "server": self.tls_protocol = ssl.PROTOCOL_TLS_SERVER self.mqtt_client.tls_set(self.CA, self.Cert, self.keyf, ssl.CERT_REQUIRED, self.tls_protocol) # None value for Cert_Reqd, until understand tls_set_context() self.mqtt_client.tls_insecure_set(False) def set_mqtt_credentials(self, username, password): self.username = username self.password = password self.mqtt_client.username_pw_set(self.username, self.password) def mqtt_connect(self, keepalive): self.keepalive = keepalive self.mqtt_client.connect(self.broker_endpoint, self.port, self.keepalive) def mqtt_protocol_loop(self): self.mqtt_client.loop_start() def mqtt_publish(self, json_packet, topic): info = self.mqtt_client.publish( topic=topic, payload=json_packet.encode('utf-8'), qos=1 ) info.wait_for_publish() print(info.is_published()) def mqtt_disconnect(self): self.mqtt_client.disconnect()