-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
186 lines (157 loc) · 6.28 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import configparser
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from config.config_loader import read_all_powermeter_configs, ClientFilter
from ct001 import CT001
from powermeter import Powermeter
from shelly import Shelly
from collections import OrderedDict
def test_powermeter(powermeter, client_filter):
try:
print("Testing powermeter configuration...")
powermeter.wait_for_message(timeout=120)
value = powermeter.get_powermeter_watts()
value_with_units = " | ".join([f"{v}W" for v in value])
powermeter_name = powermeter.__class__.__name__
filter_description = ", ".join([str(n) for n in client_filter.netmasks])
print(
f"Successfully fetched {powermeter_name} powermeter value (filter {filter_description}): {value_with_units}"
)
except Exception as e:
print(f"Error: {e}")
exit(1)
def run_device(
device_type: str,
cfg: configparser.ConfigParser,
args: argparse.Namespace,
powermeters: list[(Powermeter, ClientFilter)],
device_id: Optional[str] = None,
):
print(f"Starting device: {device_type}")
if device_type == "ct001":
disable_sum = (
args.disable_sum
if args.disable_sum is not None
else cfg.getboolean("GENERAL", "DISABLE_SUM_PHASES", fallback=False)
)
disable_absolute = (
args.disable_absolute
if args.disable_absolute is not None
else cfg.getboolean("GENERAL", "DISABLE_ABSOLUTE_VALUES", fallback=False)
)
poll_interval = (
args.poll_interval
if args.poll_interval is not None
else cfg.getint("GENERAL", "POLL_INTERVAL", fallback=1)
)
print(f"CT001 Settings for {device_id}:")
print(f"Disable Sum Phases: {disable_sum}")
print(f"Disable Absolute Values: {disable_absolute}")
print(f"Poll Interval: {poll_interval}")
device = CT001(poll_interval=poll_interval)
def update_readings(addr):
powermeter = None
for pm, client_filter in powermeters:
if client_filter.matches(addr[0]):
powermeter = pm
break
if powermeter is None:
print(f"No powermeter found for client {addr[0]}")
device.value = None
return
values = powermeter.get_powermeter_watts()
value1 = values[0] if len(values) > 0 else 0
value2 = values[1] if len(values) > 1 else 0
value3 = values[2] if len(values) > 2 else 0
if not disable_sum:
value1 = value1 + value2 + value3
value2 = value3 = 0
if not disable_absolute:
value1, value2, value3 = map(abs, (value1, value2, value3))
device.value = [value1, value2, value3]
device.before_send = update_readings
elif device_type == "shellypro3em":
print(f"Shelly Pro 3EM Settings:")
print(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=1010)
elif device_type == "shellyemg3":
print(f"Shelly EM Gen3 Settings:")
print(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2222)
elif device_type == "shellyproem50":
print(f"Shelly Pro EM 50 Settings:")
print(f"Device ID: {device_id}")
device = Shelly(powermeters=powermeters, device_id=device_id, udp_port=2223)
else:
raise ValueError(f"Unsupported device type: {device_type}")
try:
device.start()
device.join()
finally:
device.stop()
def main():
parser = argparse.ArgumentParser(description="Power meter device emulator")
parser.add_argument(
"-c", "--config", default="config.ini", help="Path to the configuration file"
)
parser.add_argument("-t", "--skip-powermeter-test", type=bool)
parser.add_argument(
"-d",
"--device-types",
nargs="+",
choices=["ct001", "shellypro3em", "shellyemg3", "shellyproem50"],
help="List of device types to emulate",
)
parser.add_argument("--device-ids", nargs="+", help="List of device IDs")
# B2500-specific arguments
parser.add_argument("-s", "--disable-sum", type=bool)
parser.add_argument("-a", "--disable-absolute", type=bool)
parser.add_argument("-p", "--poll-interval", type=int)
args = parser.parse_args()
cfg = configparser.ConfigParser(dict_type=OrderedDict)
cfg.read(args.config)
# Load general settings
device_types = (
args.device_types
if args.device_types is not None
else [
dt.strip()
for dt in cfg.get("GENERAL", "DEVICE_TYPE", fallback="ct001").split(",")
]
)
skip_test = (
args.skip_powermeter_test
if args.skip_powermeter_test is not None
else cfg.getboolean("GENERAL", "SKIP_POWERMETER_TEST", fallback=False)
)
device_ids = args.device_ids if args.device_ids is not None else []
# Fill missing device IDs with default format
while len(device_ids) < len(device_types):
device_type = device_types[len(device_ids)]
if device_type in ["shellypro3em", "shellyemg3", "shellyproem50"]:
device_ids.append(f"{device_type}-ec4609c439c{len(device_ids) + 1}")
else:
device_ids.append(f"device-{len(device_ids) + 1}")
print(f"Device Types: {device_types}")
print(f"Device IDs: {device_ids}")
print(f"Skip Test: {skip_test}")
# Create powermeter
powermeters = read_all_powermeter_configs(cfg)
if not skip_test:
for powermeter, client_filter in powermeters:
test_powermeter(powermeter, client_filter)
# Run devices in parallel
with ThreadPoolExecutor(max_workers=len(device_types)) as executor:
futures = []
for device_type, device_id in zip(device_types, device_ids):
futures.append(
executor.submit(
run_device, device_type, cfg, args, powermeters, device_id
)
)
# Wait for all devices to complete
for future in futures:
future.result()
if __name__ == "__main__":
main()