-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathchest_belt.py
378 lines (321 loc) · 12.5 KB
/
chest_belt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
"""Denoising metrics for chest belt recordings."""
import numpy as np
import pandas as pd
from physutils import io, physio
from scipy.interpolate import interp1d
from scipy.stats import zscore
from .. import references
from ..due import due
from .responses import rrf
from .utils import apply_function_in_sliding_window as afsw
from .utils import convolve_and_rescale, return_physio_or_metric, rms_envelope_1d
@due.dcite(references.BIRN_2006)
@return_physio_or_metric()
@physio.make_operation()
def respiratory_variance_time(
data, peaks=None, troughs=None, fs=None, lags=(0, 4, 8, 12), **kwargs
):
"""
Implement the Respiratory Variance over Time (Birn et al. 2006).
Procedural choices influenced by RetroTS
Parameters
----------
data : physutils.Physio, np.ndarray, or array-like object
Object containing the timeseries of the recorded respiratory signal
fs : float, optional if data is a physutils.Physio
Sampling rate of `data` in Hz
peaks : :obj:`numpy.ndarray`, optional if data is a physutils.Physio
Indices of peaks in `data`
troughs : :obj:`numpy.ndarray`, optional if data is a physutils.Physio
Indices of troughs in `data`
lags: tuple
lags in seconds of the RVT output. Default is 0, 4, 8, 12.
Outputs
-------
rvt: array_like
calculated RVT and associated lags.
References
----------
.. [1] R. M. Birn, J. B. Diamond, M. A. Smith, P. A. Bandettini,“Separating
respiratory-variation-related fluctuations from neuronal-activity-related
fluctuations in fMRI”, NeuroImage, vol.31, pp. 1536-1548, 2006.
"""
if isinstance(data, physio.Physio):
# Initialize physio object
data = physio.check_physio(data, ensure_fs=True, copy=True)
elif fs is not None and peaks is not None and troughs is not None:
data = io.load_physio(data, fs=fs)
data._metadata["peaks"] = peaks
data._metadata["troughs"] = troughs
else:
raise ValueError(
"""
To use this function you should either provide a Physio object
with existing peaks and troughs metadata (e.g. using the peakdet module), or
by providing the physiological data timeseries, the sampling frequency,
and the peak and trough indices separately.
"""
)
if data.peaks.size == 0 or data.troughs.size == 0:
raise ValueError(
"""
Peaks and troughs must be non-empty lists.
Make sure to run peak/trough detection on your physiological data first,
using the peakdet module, or other software of your choice.
"""
)
timestep = 1 / data.fs
# respiration belt timing
time = np.arange(0, len(data) * timestep, timestep)
peak_vals = data[data.peaks]
trough_vals = data[data.troughs]
peak_time = time[data.peaks]
trough_time = time[data.troughs]
mid_peak_time = (peak_time[:-1] + peak_time[1:]) / 2
period = np.diff(peak_time)
# interpolate peak values over all timepoints
peak_interp = interp1d(
peak_time, peak_vals, bounds_error=False, fill_value="extrapolate"
)(time)
# interpolate trough values over all timepoints
trough_interp = interp1d(
trough_time, trough_vals, bounds_error=False, fill_value="extrapolate"
)(time)
# interpolate period over all timepoints
period_interp = interp1d(
mid_peak_time, period, bounds_error=False, fill_value="extrapolate"
)(time)
# full_rvt is (peak-trough)/period
full_rvt = (peak_interp - trough_interp) / period_interp
# calculate lags for RVT
rvt_lags = np.zeros((len(full_rvt), len(lags)))
for ind, lag in enumerate(lags):
start_index = np.argmin(np.abs(time - lag))
temp_rvt = np.concatenate(
(
np.full((start_index), full_rvt[0]),
full_rvt[: len(full_rvt) - start_index],
)
)
rvt_lags[:, ind] = temp_rvt
return data, rvt_lags
@due.dcite(references.POWER_2018)
@return_physio_or_metric()
@physio.make_operation()
def respiratory_pattern_variability(data, window, **kwargs):
"""Calculate respiratory pattern variability.
Parameters
----------
data : physutils.Physio, np.ndarray, or array-like object
Object containing the timeseries of the recorded respiratory signal
window : int
Window length in samples.
Returns
-------
rpv_val : float
Respiratory pattern variability value.
Notes
-----
This metric was first introduced in [1]_.
1. Z-score respiratory belt signal
2. Calculate upper envelope
3. Calculate standard deviation of envelope
References
----------
.. [1] J. D. Power et al., "Ridding fMRI data of motion-related influences:
Removal of signals with distinct spatial and physical bases in multiecho
data," Proceedings of the National Academy of Sciences, issue 9, vol.
115, pp. 2105-2114, 2018.
"""
# Initialize physio object
data = physio.check_physio(data, ensure_fs=False, copy=True)
# First, z-score respiratory traces
resp_z = zscore(data.data)
# Collect upper envelope
rpv_upper_env = rms_envelope_1d(resp_z, window)
# Calculate standard deviation
rpv_val = np.std(rpv_upper_env)
return data, rpv_val
@due.dcite(references.POWER_2020)
@return_physio_or_metric()
@physio.make_operation()
def env(data, fs=None, window=10, **kwargs):
"""Calculate respiratory pattern variability across a sliding window.
Parameters
----------
data : physutils.Physio, np.ndarray, or array-like object
Object containing the timeseries of the recorded respiratory signal
fs : float, optional if data is a physutils.Physio
Sampling rate of `data` in Hz
window : :obj:`int`, optional
Size of the sliding window, in seconds.
Default is 10.
Returns
-------
env_arr
Notes
-----
This metric was first introduced in [1]_.
Across a sliding window, do the following:
1. Z-score respiratory belt signal
2. Calculate upper envelope
3. Calculate standard deviation of envelope
References
----------
.. [1] J. D. Power et al., "Characteristics of respiratory measures in
young adults scanned at rest, including systematic changes and 'missed'
deep breaths," Neuroimage, vol. 204, 2020.
"""
def _respiratory_pattern_variability(data, window):
"""
Respiratory pattern variability function without utilizing
the physutils.Physio object history, only to be used within
the context of this function. This is done to only store the
chest_belt.env operation call to the history and not all the
subsequent sub-operations
"""
# First, z-score respiratory traces
resp_z = zscore(data)
# Collect upper envelope
rpv_upper_env = rms_envelope_1d(resp_z, window)
# Calculate standard deviation
rpv_val = np.std(rpv_upper_env)
return rpv_val
if isinstance(data, physio.Physio):
# Initialize physio object
data = physio.check_physio(data, ensure_fs=True, copy=True)
elif fs is not None:
data = io.load_physio(data, fs=fs)
else:
raise ValueError(
"""
To use this function you should either provide a Physio object
with the sampling frequency encapsulated, or
by providing the physiological data timeseries and the sampling
frequency separately.
"""
)
# Convert window to Hertz
window = int(window * data.fs)
# Calculate RPV across a rolling window
env_arr = (
pd.Series(data.data)
.rolling(window=window, center=True)
.apply(_respiratory_pattern_variability, args=(window,))
)
env_arr[np.isnan(env_arr)] = 0.0
return data, env_arr
@due.dcite(references.CHANG_GLOVER_2009)
@return_physio_or_metric()
@physio.make_operation()
def respiratory_variance(data, fs=None, window=6, **kwargs):
"""Calculate respiratory variance.
Parameters
----------
data : physutils.Physio, np.ndarray, or array-like object
Object containing the timeseries of the recorded respiratory signal
fs : float, optional if data is a physutils.Physio
Sampling rate of `data` in Hz
window : :obj:`int`, optional
Size of the sliding window, in seconds.
Default is 6.
Returns
-------
rv_out : (X, 2) :obj:`numpy.ndarray`
Respiratory variance values.
The first column is raw RV values, after normalization.
The second column is RV values convolved with the RRF, after normalization.
Notes
-----
Respiratory variance (RV) was introduced in [1]_, and consists of the
standard deviation of the respiratory trace within a 6-second window.
This metric is often lagged back and/or forward in time and convolved with
a respiratory response function before being included in a GLM.
Regressors also often have mean and linear trends removed and are
standardized prior to regressions.
References
----------
.. [1] C. Chang & G. H. Glover, "Relationship between respiration,
end-tidal CO2, and BOLD signals in resting-state fMRI," Neuroimage,
issue 4, vol. 47, pp. 1381-1393, 2009.
"""
if isinstance(data, physio.Physio):
# Initialize physio object
data = physio.check_physio(data, ensure_fs=True, copy=True)
elif fs is not None:
data = io.load_physio(data, fs=fs)
else:
raise ValueError(
"""
To use this function you should either provide a Physio object
with the sampling frequency encapsulated, or
by providing the physiological data timeseries and the sampling
frequency separately.
"""
)
# Convert window to Hertz
halfwindow_samples = int(round(window * data.fs / 2))
# Raw respiratory variance
rv_arr = afsw(data, np.std, halfwindow_samples)
# Convolve with rrf
rv_out = convolve_and_rescale(rv_arr, rrf(data.fs), rescale="zscore")
return data, rv_out
@return_physio_or_metric()
@physio.make_operation()
def respiratory_phase(data, n_scans, slice_timings, t_r, fs=None, **kwargs):
"""Calculate respiratory phase from respiratory signal.
Parameters
----------
data : physutils.Physio, np.ndarray, or array-like object
Object containing the timeseries of the recorded respiratory signal
n_scans
Number of volumes in the imaging run.
slice_timings
Slice times, in seconds.
t_r
Sample rate of the imaging run, in seconds.
fs : float, optional if data is a physutils.Physio
Sampling rate of `data` in Hz
Returns
-------
phase_resp : array_like
Respiratory phase signal, of shape (n_scans, n_slices).
"""
if isinstance(data, physio.Physio):
# Initialize physio object
data = physio.check_physio(data, ensure_fs=True, copy=True)
elif fs is not None:
data = io.load_physio(data, fs=fs)
else:
raise ValueError(
"""
To use this function you should either provide a Physio object
with the sampling frequency encapsulated, or
by providing the physiological data timeseries and the sampling
frequency separately.
"""
)
assert slice_timings.ndim == 1, "Slice times must be a 1D array"
n_slices = np.size(slice_timings)
phase_resp = np.zeros((n_scans, n_slices))
# generate histogram from respiratory signal
# TODO: Replace with numpy.histogram
resp_hist, resp_hist_bins = np.histogram(data, bins=100)
# first compute derivative of respiration signal
resp_diff = np.diff(data, n=1)
for i_slice in range(n_slices):
# generate slice acquisition timings across all scans
times_crSlice = t_r * np.arange(n_scans) + slice_timings[i_slice]
phase_resp_crSlice = np.zeros(n_scans)
for j_scan in range(n_scans):
iphys = int(
max([1, round(times_crSlice[j_scan] * data.fs)])
) # closest idx in resp waveform
iphys = min([iphys, len(resp_diff)]) # cannot be longer than resp_diff
thisBin = np.argmin(abs(data[iphys] - resp_hist_bins))
numerator = np.sum(resp_hist[0:thisBin])
phase_resp_crSlice[j_scan] = (
np.math.pi * np.sign(resp_diff[iphys]) * (numerator / len(data))
)
phase_resp[:, i_slice] = phase_resp_crSlice
return data, phase_resp