-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsimplespectral.py
executable file
·343 lines (263 loc) · 10.7 KB
/
simplespectral.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#!/usr/bin/env python
"""Heavily simplified scipy.signal.spectral module which only depends on NumPy and supports pyFFTW
SimpleSpectral preferably uses pyfftw for FFT calculations, then scipy.fftpack
and numpy.fft as a last resort. Install pyfftw or scipy if possible,
because numpy.fft has horrible memory usage and is also much slower.
"""
import numpy, time, logging, os
__version__ = '1.0.0'
logger = logging.getLogger(__name__)
# You can disable pyFFTW usage by setting use_pyfftw to False
use_pyfftw = True
# You can set number of pyFFTW threads manually by changing fft_threads variable
fft_threads = 1
### FFT functions ###
try:
import pyfftw
fft_pyfftw = True
# Set number of pyFFTW threads to number of CPUs by default
fft_threads = os.cpu_count() or 1
# Enable pyFFTW planner cache
pyfftw.interfaces.cache.enable()
pyfftw.interfaces.cache.set_keepalive_time(3600)
logger.debug('pyfftw module found (using {} threads by default)'.format(fft_threads))
except ImportError:
fft_pyfftw = False
try:
import scipy.fftpack
fft_scipy = True
logger.debug('scipy.fftpack module found')
except ImportError:
fft_scipy = False
if not fft_pyfftw:
logger.warning('Using numpy.fft for FFT calculations, install pyfftw or scipy instead!')
def fft(x, n=None, axis=-1, **kwargs):
"""Compute Fast Fourier Transform (FFT) with pyfftw, scipy.fftpack or numpy.fft"""
if fft_pyfftw and use_pyfftw:
kwargs['threads'] = kwargs.get('threads', fft_threads)
return pyfftw.interfaces.numpy_fft.fft(x, n, axis, **kwargs)
elif fft_scipy:
return scipy.fftpack.fft(x, n, axis, **kwargs)
else:
return numpy.fft.fft(x, n, axis, **kwargs)
### Array functions ###
def empty(shape, dtype='float64', order='C', **kwargs):
"""Empty array with the given shape, dtype, and order"""
if fft_pyfftw and use_pyfftw:
return pyfftw.empty_aligned(shape, dtype, order, **kwargs)
else:
return numpy.empty(shape, dtype, order)
def zeros(shape, dtype='float64', order='C', **kwargs):
"""Array of zeros with the given shape, dtype, and order"""
if fft_pyfftw and use_pyfftw:
return pyfftw.zeros_aligned(shape, dtype, order, **kwargs)
else:
return numpy.zeros(shape, dtype, order)
def ones(shape, dtype='float64', order='C', **kwargs):
"""Array of ones with the given shape, dtype, and order"""
if fft_pyfftw and use_pyfftw:
return pyfftw.ones_aligned(shape, dtype, order, **kwargs)
else:
return numpy.ones(shape, dtype, order)
### Window functions ###
def window_boxcar(M):
"""Return a boxcar window, also known as a rectangular window"""
return numpy.ones(M, float)
def window_tukey(M, alpha=0.5):
"""Return a Tukey window, also known as a tapered cosine window.
The function returns a Hann window for `alpha=0` and a boxcar window for `alpha=1`
"""
if alpha == 0:
return numpy.hann(M)
elif alpha == 1:
return window_boxcar(M)
n = numpy.arange(0, M)
width = int(numpy.floor(alpha * (M - 1) / 2.0))
n1 = n[0:width + 1]
n2 = n[width + 1:M - width - 1]
n3 = n[M - width - 1:]
w1 = 0.5 * (1 + numpy.cos(numpy.pi * (-1 + 2.0 * n1 / alpha / (M - 1))))
w2 = numpy.ones(n2.shape)
w3 = 0.5 * (1 + numpy.cos(numpy.pi * (-2.0 / alpha + 1 + 2.0 * n3 / alpha / (M - 1))))
return numpy.concatenate((w1, w2, w3))
windows = {
'boxcar': window_boxcar,
'tukey': window_tukey,
'hann': numpy.hanning,
'hamming': numpy.hamming,
'bartlett': numpy.bartlett,
'blackman': numpy.blackman,
'kaiser': numpy.kaiser
}
def get_window(window, Nx, fftbins=True):
"""Return a window
(if fftbins=True, generates a periodic window, for use in spectral analysis)
"""
if isinstance(window, tuple):
winstr = window[0]
args = window[1:]
else:
winstr = window
args = []
try:
winfunc = windows[winstr]
except KeyError:
raise ValueError("Unknown window type.")
odd = Nx % 2
if fftbins and not odd:
Nx = Nx + 1
w = winfunc(Nx, *args)
if fftbins and not odd:
w = w[:-1]
return w
### Boundary functions ###
boundaries = {
'even': ('reflect', {'reflect_type': 'even'}),
'odd': ('reflect', {'reflect_type': 'odd'}),
'constant': ('edge', {}),
'zeros': ('constant', {'constant_values': 0}),
}
def extend_boundaries(x, width, boundary):
"""Extend input signal at both ends"""
try:
pad_mode, pad_args = boundaries[boundary]
except KeyError:
raise ValueError('Unknown boundary extension mode.')
return numpy.pad(x, width, pad_mode, **pad_args)
### Detrend functions ###
def detrend_constant(data, axis=-1):
"""Remove trend from data by subtracting mean of data"""
ret = data - numpy.expand_dims(numpy.mean(data, axis), axis)
return ret
detrend_types = {
'constant': detrend_constant,
}
def get_detrend(type):
"""Return detrending function"""
try:
detrend_func = detrend_types[type]
except KeyError:
raise ValueError('Unknown detrend type.')
return detrend_func
### PSD functions ###
def periodogram(x, fs, nperseg, window='boxcar', detrend='constant', scaling='density'):
"""Estimate power spectral density using a periodogram
(same as Welch's method with zero overlap and same number of FFT bins as input samples)
"""
if nperseg < len(x):
x = x[:nperseg]
elif nperseg > len(x):
nperseg = len(x)
return welch(x, fs, nperseg, window=window, noverlap=0, detrend=detrend, scaling=scaling)
def welch(x, fs, nperseg, window='hann', noverlap=None, detrend='constant', scaling='density'):
"""Estimate power spectral density using Welch's method"""
if noverlap is None:
noverlap = nperseg // 2
freqs, time, Pxx = _spectral_helper(x, fs, nperseg, window=window, noverlap=noverlap,
detrend=detrend, scaling=scaling, mode='psd')
# Average over windows
if len(Pxx.shape) >= 2 and Pxx.size > 0:
if Pxx.shape[-1] > 1:
Pxx = Pxx.mean(axis=-1)
else:
Pxx = numpy.reshape(Pxx, Pxx.shape[:-1])
return freqs, Pxx.real
def spectrogram(x, fs, nperseg, window=('tukey', 0.25), noverlap=None, detrend='constant',
scaling='density', mode='psd'):
"""Compute a spectrogram with consecutive Fourier transforms"""
if noverlap is None:
noverlap = nperseg // 8
# Modes:
# 'psd' uses Welch's method
# 'complex' is equivalent to the output of `stft` with no padding or boundary extension
# 'magnitude' returns the absolute magnitude of the STFT
if mode == 'psd':
freqs, time, Sxx = _spectral_helper(x, fs, nperseg, window=window, noverlap=noverlap,
detrend=detrend, scaling=scaling, mode='psd')
else:
freqs, time, Sxx = _spectral_helper(x, fs, nperseg, window=window, noverlap=noverlap,
detrend=detrend, scaling=scaling, mode='stft',
boundary=None, padded=False)
if mode == 'magnitude':
Sxx = numpy.abs(Sxx)
elif mode == 'complex':
pass
return freqs, time, Sxx
def stft(x, fs, nperseg, window='hann', noverlap=None, detrend=False, boundary='zeros', padded=True):
"""Compute the Short Time Fourier Transform (STFT)"""
if noverlap is None:
noverlap = nperseg // 2
freqs, time, Zxx = _spectral_helper(x, fs, nperseg, window=window, noverlap=noverlap,
detrend=detrend, scaling='spectrum', mode='stft',
boundary=boundary, padded=padded)
return freqs, time, Zxx
### Helper functions ###
def _spectral_helper(x, fs, nperseg, window='hann', noverlap=None, detrend='constant',
scaling='spectrum', mode='psd', boundary=None, padded=False):
"""Calculate various forms of windowed FFTs for STFT, PSD, etc."""
if noverlap is None:
noverlap = nperseg // 2
nstep = nperseg - noverlap
outdtype = numpy.result_type(x, numpy.complex64)
win = get_window(window, nperseg)
if numpy.result_type(win, numpy.complex64) != outdtype:
win = win.astype(outdtype)
# Extend input signal at both ends
if boundary is not None:
x = extend_boundaries(x, nperseg // 2, boundary)
# Make the input signal zero-padded at the end to make the signal
# fit exactly into an integer number of window segments
if padded:
nadd = (-(len(x) - nperseg) % nstep) % nperseg
x = numpy.concatenate((x, numpy.zeros(nadd)), axis=-1)
# Specify how to detrend each segment
if not detrend:
def detrend_func(d):
return d
elif not hasattr(detrend, '__call__'):
detrend_func = get_detrend(detrend)
else:
detrend_func = detrend
# Calculate windowed FFT
freqs = numpy.fft.fftfreq(nperseg, 1 / fs)
result = _fft_helper(x, win, detrend_func, nperseg, noverlap)
if mode == 'psd':
result = numpy.conjugate(result) * result
# Scale result as the power spectral density ('density')
# where `Pxx` has units of V**2/Hz or the power spectrum
# ('spectrum') where `Pxx` has units of V**2
if scaling == 'density':
scale = 1.0 / (fs * (win * win).sum())
elif scaling == 'spectrum':
scale = 1.0 / win.sum()**2
if mode == 'stft':
scale = numpy.sqrt(scale)
result *= scale
result = result.astype(outdtype)
# All imaginary parts are zero anyways
if mode != 'stft':
result = result.real
# Create array of times corresponding to each data segment
time = numpy.arange(nperseg / 2, x.shape[-1] - nperseg / 2 + 1, nperseg - noverlap) / float(fs)
if boundary is not None:
time -= (nperseg / 2) / fs
# Make sure window/time index is last axis
result = numpy.rollaxis(result, -1, -2)
return freqs, time, result
def _fft_helper(x, win, detrend_func, nperseg, noverlap):
"""Calculate windowed FFT, for internal use by _spectral_helper"""
step = nperseg - noverlap
shape = ((len(x) - noverlap) // step, nperseg)
strides = (step * x.strides[-1], x.strides[-1])
result = numpy.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
# Detrend each data segment individually
result = detrend_func(result)
# Apply window by multiplication
result = win * result
# Compute FFT
# Note: numpy.fft.fft gives slightly different results than scipy.fftpack.fft,
# maybe different precision? Anyway, difference is small and can be ignored.
t = time.time()
result = fft(result, n=nperseg)
logger.debug('FFT time: {:.3f} s'.format(time.time() - t))
return result