forked from IRNAS/ppk2-api-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpower_profiler.py
182 lines (150 loc) · 6.79 KB
/
power_profiler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import time
import csv
import datetime
from threading import Thread
# import numpy as np
# import matplotlib.pyplot as plt
# import matplotlib
from src.ppk2_api import PPK2_API
class PowerProfiler():
def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
"""Initialize PPK2 power profiler with serial"""
self.measuring = None
self.measurement_thread = None
self.ppk2 = None
try:
if serial_port:
self.ppk2 = PPK2_API(serial_port)
else:
serial_port = self.discover_port()
if serial_port:
self.ppk2 = PPK2_API(serial_port)
ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct
except Exception as e:
ret = None
raise e
if not ret:
self.ppk2 = None
#raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}")
else:
self.ppk2.use_source_meter()
self.source_voltage_mV = source_voltage_mV
self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V
self.measuring = False
self.current_measurements = []
# local variables used to calculate power consumption
self.measurement_start_time = None
self.measurement_stop_time = None
time.sleep(1)
self.stop = False
self.measurement_thread = Thread(target=self.measurement_loop, daemon=True)
self.measurement_thread.start()
# write to csv
self.filename = filename
if self.filename is not None:
with open(self.filename, 'w', newline='') as file:
writer = csv.writer(file)
row = []
for key in ["ts", "avg1000"]:
row.append(key)
writer.writerow(row)
def write_csv_rows(self, samples):
"""Write csv row"""
with open(self.filename, 'a', newline='') as file:
writer = csv.writer(file)
for sample in samples:
row = [datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S.%f'), sample]
writer.writerow(row)
def delete_power_profiler(self):
"""Join thread"""
self.measuring = False
self.stop = True
if self.measurement_thread:
self.measurement_thread.join()
self.measurement_thread = None
if self.ppk2:
self.disable_power()
def discover_port(self):
"""Discovers ppk2 serial port"""
ppk2s_connected = PPK2_API.list_devices()
if(len(ppk2s_connected) == 1):
ppk2_port = ppk2s_connected[0]
print(f'Found PPK2 at {ppk2_port}')
return ppk2_port
else:
print(f'Too many connected PPK2\'s: {ppk2s_connected}')
return None
def enable_power(self):
"""Enable ppk2 power"""
if self.ppk2:
self.ppk2.toggle_DUT_power("ON")
return True
return False
def disable_power(self):
"""Disable ppk2 power"""
if self.ppk2:
self.ppk2.toggle_DUT_power("OFF")
return True
return False
def measurement_loop(self):
"""Endless measurement loop will run in a thread"""
while True and not self.stop:
if self.measuring: # read data if currently measuring
read_data = self.ppk2.get_data()
if read_data != b'':
#samples = self.ppk2.get_samples(read_data)
samples = self._average_samples(self.ppk2.get_samples(read_data), 1024) # optionally average samples
self.current_measurements += samples # can easily sum lists, will append individual data
time.sleep(0.001) # TODO figure out correct sleep duration
def _average_samples(self, list, window_size):
"""Average samples based on window size"""
chunks = [list[val:val + window_size] for val in range(0, len(list), window_size)]
avgs = []
for chunk in chunks:
avgs.append(sum(chunk) / len(chunk))
return avgs
def start_measuring(self):
"""Start measuring"""
if not self.measuring: # toggle measuring flag only if currently not measuring
self.current_measurements = [] # reset current measurements
self.ppk2.start_measuring() # send command to ppk2
self.measuring = True # set internal flag
self.measurement_start_time = time.time()
def stop_measuring(self):
"""Stop measuring and return average of period"""
self.measurement_stop_time = time.time()
self.measuring = False
self.ppk2.stop_measuring() # send command to ppk2
#samples_average = self._average_samples(self.current_measurements, 1000)
if self.filename is not None:
self.write_csv_rows(self.current_measurements)
def get_min_current_mA(self):
return min(self.current_measurements) / 1000
def get_max_current_mA(self):
return max(self.current_measurements) / 1000
def get_average_current_mA(self):
"""Returns average current of last measurement in mA"""
if len(self.current_measurements) == 0:
return 0
average_current_mA = (sum(self.current_measurements) / len(self.current_measurements)) / 1000 # measurements are in microamperes, divide by 1000
return average_current_mA
def get_average_power_consumption_mWh(self):
"""Return average power consumption of last measurement in mWh"""
average_current_mA = self.get_average_current_mA() # convert microamperes to milliamperes
average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
measurement_duration_h = self.get_measurement_duration_s() / 3600 # duration in seconds, divide by 3600 to get hours
average_consumption_mWh = average_power_mW * measurement_duration_h
return average_consumption_mWh
def get_average_charge_mC(self):
"""Returns average charge in milli coulomb"""
average_current_mA = self.get_average_current_mA()
measurement_duration_s = self.get_measurement_duration_s() # in seconds
return average_current_mA * measurement_duration_s
def get_measurement_duration_s(self):
"""Returns duration of measurement"""
measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds
return measurement_duration_s
# pp = PowerProfiler("/dev/ttyACM1")
# pp.start_measuring()
# time.sleep(10)
# pp.stop_measuring()