forked from hashbrowncipher/doppler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalignment.py
36 lines (27 loc) · 1.18 KB
/
alignment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from collections import deque
from numpy import array
from pipe_util import split_fileinput
from pipe_util import join_output
from world_params import CHANNELS
from world_params import SPEED_OF_SOUND_METERS_SECOND
def align(event_stream):
"""Assume that every microphone will see every peak. Then each microphone
will see the same number of peaks arriving in the same order (unless the
dart is faster than sound). So have some queues of events from every mic,
then ouput the times of the same event arriving at every mic.
event_stream must be a generator of (channel int, time float).
Yields time + 8-tuples of distances.
"""
queues = tuple(deque() for i in range(CHANNELS))
for channel, event_t in event_stream:
# In on the right.
queues[channel].append(event_t)
# If we've seen a peak on every channel,
if all(len(queue) > 0 for queue in queues):
# yield all those relative times.
# Out on the left.
yield tuple(queue.popleft() for queue in queues)
if __name__ == '__main__':
for aligned_times in align(split_fileinput((int, float))):
aligned_distances = array(aligned_times) * SPEED_OF_SOUND_METERS_SECOND
join_output([aligned_times[0]] + list(aligned_distances))