-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfridge_mqtt.py
142 lines (113 loc) · 3.46 KB
/
fridge_mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
'''fridge_mqtt.py
Monitors a given Alpicool compatible fridge and sends
status updates to an MQTT broker
'''
import asyncio
import argparse
import logging
import sys
from typing import Optional
import paho.mqtt.client as mqtt
from fridge import Fridge, FridgeData
def publish_offline(mqttc: mqtt.Client, addr: str):
'''Publish online=false to the MQTT broker'''
mqttc.publish(f"fridge/{addr}/online", False)
def publish_status(mqttc: mqtt.Client,
addr: str,
data: FridgeData,
previous_data: Optional[FridgeData]
):
'''Publish the current fridge status to the MQTT broker'''
if previous_data is None:
mqttc.publish(f"fridge/{addr}/online", True)
if previous_data is None or data != previous_data:
info = data.to_dict()
mqttc.publish(f'fridge/{addr}/state', info)
async def run(addr: str,
bind: bool,
poll: bool,
pollinterval: int,
mqttc: mqtt.Client
):
'''Run the write-notify loop'''
# pylint: disable=R0801
async with Fridge(addr) as fridge:
if bind:
await asyncio.wait_for(fridge.bind(), 30)
try:
query_response = await asyncio.wait_for(fridge.query(), 5)
except TimeoutError:
pass
else:
publish_status(mqttc, addr, query_response, None)
last_status = query_response
while poll:
await asyncio.sleep(pollinterval)
try:
await asyncio.wait_for(fridge.query(), 5)
except TimeoutError:
if last_status is not None:
publish_offline(mqttc, addr)
last_status = None
else:
publish_status(mqttc, addr, query_response, last_status)
last_status = query_response
def main():
'''fridge_mqtt.py entry point when run as a script'''
# pylint: disable=R0801
parser = argparse.ArgumentParser(
prog='fridge_mqtt.py',
description='Fridge monitor for Alpicool / Brass Monkey fridges'
)
parser.add_argument(
'address',
help='Bluetooth address of fridge'
)
parser.add_argument(
'-b',
'--bind',
action='store_true',
help='Press settings button on fridge to confirm fridge selection'
)
parser.add_argument(
'-l',
'--loop',
action='store_true',
help='Poll at regular intervals (default: query once)'
)
parser.add_argument(
'-t',
'--pollinterval',
type = int,
default=10,
help='Poll interval in seconds (default: 10)'
)
parser.add_argument(
'-h',
'--mqtt-host',
type = str,
required=True,
help='MQTT host name / ip address'
)
parser.add_argument(
'-p',
'--mqtt-port',
type = int,
default = 1883,
help='MQTT port'
)
args = parser.parse_args()
logging.basicConfig()
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.connect(args.mqtt_host, args.mqtt_port, 60)
mqttc.loop_start()
try:
asyncio.run(run(args.address, args.bind, args.loop, args.pollinterval, mqttc))
except KeyboardInterrupt:
sys.stderr.write('Exiting\n')
finally:
publish_offline(mqttc, args.address)
mqttc.loop_stop()
if __name__ == '__main__':
main()