-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
86 lines (72 loc) · 3.27 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# APSUSE Directory Monitor
import os
import glob
import redis
import numpy as np
from time import sleep
from threading import Thread
from sigpyproc.Readers import FilReader
"""
Idea:
Connect clients to FBFUSE and APSUSE to scrape information.
The following is desired:
- FBFUSE beam positions and tiling pattern (with markers for beams that are bein recorded by apsuse)
- APSUSE current recoding directory (currently just subarray_1) [key = array_X_current_recording_directory]
This can be used to create:
- bandpasses for coherent and incoherent beams
- folds for pulsars in both coherent and incoherent beam
q = psrqpy.QueryATNF(params=['JName'], circular_boundary=(opts.ra, opts.dec, opts.radius), assoc=opts.assoc, psrtype=opts.psrtype)
"""
class BandpassGenerator(Thread):
def __init__(self, root_dir, interval=300):
Thread.__init__(self)
self.setDaemon(True)
self._root_dir = root_dir
self._interval = interval
self._coherent_file = None
self._incoherent_file = None
self._redis = redis.StrictRedis("apsuse-monitor-redis")
def generate_bandpass(self, fname):
fil = FilReader(fname)
ar = np.recarray(fil.header.nchans, dtype=[
("frequency", "float32"), ("mean", "float32"), ("std", "float32")])
fil.getStats(gulp=10000)
ar["mean"] = fil.chan_means
ar["std"] = fil.chan_stdevs
ar["frequency"] = np.linspace(
fil.header.fbottom, fil.header.ftop, fil.header.nchans)
return ar
def callback(self):
# first find the most recent directory:
directory = max(glob.glob("/{}/*/*/*/".format(self._root_dir)),
key=os.path.getctime)
coherent_dir = sorted(glob.glob("{}/cfbf*/".format(directory)))[0]
# Take the second to last file as it is guaranteed to be finished writing
coherent_file = sorted(glob.glob("{}/*.fil".format(coherent_dir)))[-2]
incoherent_dir = sorted(glob.glob("{}/ifbf*/".format(directory)))[0]
# Take the second to last file as it is guaranteed to be finished writing
incoherent_file = sorted(glob.glob("{}/*.fil".format(incoherent_dir)))[-2]
self._redis.set("filterbank-directory-monitor:directory", directory)
if coherent_file != self._coherent_file:
self._redis.set("filterbank-directory-monitor:coherent:bandpass",
self.generate_bandpass(coherent_file).tobytes())
self._redis.set("filterbank-directory-monitor:coherent:file",
coherent_file.split("/")[-1])
self._coherent_file = coherent_file
if incoherent_file != self._incoherent_file:
self._redis.set("filterbank-directory-monitor:incoherent:bandpass",
self.generate_bandpass(incoherent_file).tobytes())
self._redis.set("filterbank-directory-monitor:incoherent:file",
incoherent_file.split("/")[-1])
self._incoherent_file = incoherent_file
def run(self):
while True:
try:
self.callback()
except Exception as error:
print("Error: {}".format(str(error)))
finally:
sleep(self._interval)
if __name__ == "__main__":
scraper = BandpassGenerator("/beegfs/DATA/TRAPUM/")
scraper.run()