Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

57 multiple sampling rates #58

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libbids/instruments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .instrument import Instrument
from .eeg_instrument import EEGInstrument
from .ieeg_instrument import IEEGInstrument
from .instrument import Instrument
from .physio_instrument import PhysioInstrument
from .read_instrument import ReadInstrument
from .stim_instrument import StimInstrument
from .write_instrument import WriteInstrument

EEGInstrument
IEEGInstrument
Instrument
PhysioInstrument
ReadInstrument
Expand Down
89 changes: 67 additions & 22 deletions libbids/instruments/eeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ def __init__(
self,
session: "Session",
device: Any,
sfreq: int,
sfreq: Union[int, List[int]],
electrodes: List[str],
physical_dimension: str = "uV",
physical_lim: Tuple = (-1000.0, 1000.0),
preamp_filter: str = "",
record_duration: float = 1.0,
init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
is_digital: bool = False,
**kwargs
):
"""Initialize a device for collecting electroecephalograms
Expand All @@ -45,8 +46,10 @@ def __init__(
Session currently using this instrument
device : Any
The a class that respresents a device for eeg data collection
sfreq : int
The device's sampling frequency
sfreq : Union[int, List[int]]
The device's sampling frequency. All channels/electrodes assume the
same sampling rate if a signle integer is provided, else a list of
sampling rates must be provided for each channel/electrode
electrodes: List[str]
A list of electrode names associated with the device
physical_dimension : str
Expand All @@ -66,14 +69,20 @@ def __init__(
callable, the function is simply called
read_fn : Union[Tuple[str, Dict], Callable]
Similar to `init_read_fn`, but used for sampling data from the device
is_digital : bool
Whether the data recorded from the device is in a digital format or
a physical floating point integer (e.g., µV)
kwargs : Dict
This keyword arguments dictionary is used to supply detailes to the
edf file header. <See
https://pyedflib.readthedocs.io/en/latest/_modules/pyedflib/edfwriter.html#EdfWriter.setHeader>
"""
super(EEGInstrument, self).__init__(session, Modality.EEG, file_ext="edf")
self.sfreq: int = sfreq
self.sfreqs: List[int] = sfreq if isinstance(sfreq, List) else [sfreq]
assert len(electrodes) > 1, "Must supply electrodes"
assert len(self.sfreqs) == 1 or len(self.sfreqs) == len(
electrodes
), "Must supply same number of sampling rates as electrodes"
self.device: Any = device
self.electrodes: List[str] = electrodes
self.physical_dimension: str = physical_dimension
Expand All @@ -82,9 +91,11 @@ def __init__(
self.record_duration: float = record_duration
self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn
self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn
self.is_digital: bool = is_digital
self.modality_path.mkdir(exist_ok=True)
self.metadata: Dict = self._fixup_edf_metadata(kwargs)
self.buffer: np.ndarray
self.buffers: List[np.ndarray] = [np.array([]) for i in range(len(self.sfreqs))]

def annotate(self, onset: float, duration: float, description: str):
assert self.writer.writeAnnotation(onset, duration, description) == 0
Expand All @@ -93,12 +104,20 @@ def device_init_read(self):
"""Initializes reading on the device"""
if isinstance(self.init_read_fn, Callable):
self.init_read_fn()
else:
fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)

fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)
def device_read(self) -> Union[np.ndarray, List]:
"""read data from the device

def device_read(self) -> np.ndarray:
"""read data from the device"""
Returns
-------
np.ndarray
If all channels share the same sampling rate
List
If not all channels share the same sampling rate
"""
if isinstance(self.read_fn, Callable): # type: ignore
return cast(Callable, self.read_fn)()

Expand All @@ -109,7 +128,7 @@ def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()

def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data

Parameters
Expand All @@ -123,18 +142,42 @@ def read(self, remainder: bool = False) -> np.ndarray:
np.ndarray
A 2D array of data in the shape of (channels, time)
"""
# samples
samples: np.ndarray = self.device_read()
self.buffer = np.c_[self.buffer, samples]
if (not remainder) and (self.buffer.shape[1] >= 256):
writebuf: np.ndarray = self.buffer[:, :256]
self.buffer = self.buffer[:, 256:]
self.writer.writeSamples(np.ascontiguousarray(writebuf))
elif remainder and (self.buffer.shape[1] > 0):
writebuf = self.buffer[:, :256]
self.writer.writeSamples(np.ascontiguousarray(writebuf))

return samples
if len(self.sfreqs) == 1:
sfreq: int = self.sfreqs[0]
period: int = int(sfreq * self.record_duration)
samples: np.ndarray = cast(np.ndarray, self.device_read())
self.buffer = np.c_[self.buffer, samples]
if (not remainder) and (self.buffer.shape[1] >= period):
writebuf: np.ndarray = self.buffer[:, :period]
self.buffer = self.buffer[:, period:]
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
elif remainder and (self.buffer.shape[1] > 0):
writebuf = self.buffer[:, :period]
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
return samples
else:
periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs]
ch_samples: List = cast(List, self.device_read())
assert len(ch_samples) == len(
self.sfreqs
), "Data must be the same length as the number sfreqs"
self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)]
period_met: np.bool_ = np.all(
[i.shape[0] >= j for i, j in zip(self.buffers, periods)]
)
has_data: np.bool_ = np.any([i.shape[0] > 0 for i in self.buffers])
if (not remainder) and period_met:
writebufs: List = [i[:j] for i, j in zip(self.buffers, periods)]
self.buffers = [i[j:] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)
elif remainder and has_data:
writebufs = [i[:j] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)
return ch_samples

def start(self, task: str, run_id: str):
"""Begin recording a run
Expand Down Expand Up @@ -203,6 +246,8 @@ def _initialize_edf_file(self) -> None:
self.writer.setPhysicalDimension(i, self.physical_dimension)
self.writer.setPhysicalMaximum(i, self.physical_lim[0])
self.writer.setPhysicalMinimum(i, self.physical_lim[1])
self.writer.setSamplefrequency(i, self.sfreq)
self.writer.setSamplefrequency(
i, self.sfreqs[0] if len(self.sfreqs) == 1 else self.sfreqs[i]
)
if "AUX" not in el:
self.writer.setPrefilter(i, self.preamp_filter)
91 changes: 68 additions & 23 deletions libbids/instruments/ieeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ def __init__(
self,
session: "Session",
device: Any,
sfreq: int,
sfreq: Union[int, List[int]],
electrodes: List[str],
physical_dimension: str = "uV",
physical_lim: Tuple = (-1000.0, 1000.0),
preamp_filter: str = "",
record_duration: float = 1.0,
init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None,
is_digital: bool = False,
**kwargs
):
"""Initialize a device for collecting electroecephalograms
Expand All @@ -45,9 +46,11 @@ def __init__(
session : Session
Session currently using this instrument
device : Any
The a class that respresents a device for eeg data collection
sfreq : int
The device's sampling frequency
The a class that respresents a device for ieeg data collection
sfreq : Union[int, List[int]]
The device's sampling frequency. All channels/electrodes assume the
same sampling rate if a signle integer is provided, else a list of
sampling rates must be provided for each channel/electrode
electrodes: List[str]
A list of electrode names associated with the device
physical_dimension : str
Expand All @@ -67,14 +70,20 @@ def __init__(
callable, the function is simply called
read_fn : Union[Tuple[str, Dict], Callable]
Similar to `init_read_fn`, but used for sampling data from the device
is_digital : bool
Whether the data recorded from the device is in a digital format or
a physical floating point integer (e.g., µV)
kwargs : Dict
This keyword arguments dictionary is used to supply detailes to the
edf file header. <See
https://pyedflib.readthedocs.io/en/latest/_modules/pyedflib/edfwriter.html#EdfWriter.setHeader>
"""
super(IEEGInstrument, self).__init__(session, Modality.iEEG, file_ext="edf")
self.sfreq: int = sfreq
self.sfreqs: List[int] = sfreq if isinstance(sfreq, List) else [sfreq]
assert len(electrodes) > 1, "Must supply electrodes"
assert len(self.sfreqs) == 1 or len(self.sfreqs) == len(
electrodes
), "Must supply same number of sampling rates as electrodes"
self.device: Any = device
self.electrodes: List[str] = electrodes
self.physical_dimension: str = physical_dimension
Expand All @@ -83,9 +92,11 @@ def __init__(
self.record_duration: float = record_duration
self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn
self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn
self.is_digital: bool = is_digital
self.modality_path.mkdir(exist_ok=True)
self.metadata: Dict = self._fixup_edf_metadata(kwargs)
self.buffer: np.ndarray
self.buffers: List[np.ndarray] = [np.array([]) for i in range(len(self.sfreqs))]

def annotate(self, onset: float, duration: float, description: str):
assert self.writer.writeAnnotation(onset, duration, description) == 0
Expand All @@ -94,12 +105,20 @@ def device_init_read(self):
"""Initializes reading on the device"""
if isinstance(self.init_read_fn, Callable):
self.init_read_fn()
else:
fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)

fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)
def device_read(self) -> Union[np.ndarray, List]:
"""read data from the device

def device_read(self) -> np.ndarray:
"""read data from the device"""
Returns
-------
np.ndarray
If all channels share the same sampling rate
List
If not all channels share the same sampling rate
"""
if isinstance(self.read_fn, Callable): # type: ignore
return cast(Callable, self.read_fn)()

Expand All @@ -110,7 +129,7 @@ def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()

def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data

Parameters
Expand All @@ -124,18 +143,42 @@ def read(self, remainder: bool = False) -> np.ndarray:
np.ndarray
A 2D array of data in the shape of (channels, time)
"""
# samples
samples: np.ndarray = self.device_read()
self.buffer = np.c_[self.buffer, samples]
if (not remainder) and (self.buffer.shape[1] >= 256):
writebuf: np.ndarray = self.buffer[:, :256]
self.buffer = self.buffer[:, 256:]
self.writer.writeSamples(np.ascontiguousarray(writebuf))
elif remainder and (self.buffer.shape[1] > 0):
writebuf = self.buffer[:, :256]
self.writer.writeSamples(np.ascontiguousarray(writebuf))

return samples
if len(self.sfreqs) == 1:
sfreq: int = self.sfreqs[0]
period: int = int(sfreq * self.record_duration)
samples: np.ndarray = cast(np.ndarray, self.device_read())
self.buffer = np.c_[self.buffer, samples]
if (not remainder) and (self.buffer.shape[1] >= period):
writebuf: np.ndarray = self.buffer[:, :period]
self.buffer = self.buffer[:, period:]
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
elif remainder and (self.buffer.shape[1] > 0):
writebuf = self.buffer[:, :period]
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
return samples
else:
periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs]
ch_samples: List = cast(List, self.device_read())
assert len(ch_samples) == len(
self.sfreqs
), "Data must be the same length as the number sfreqs"
self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)]
period_met: np.bool_ = np.all(
[i.shape[0] >= j for i, j in zip(self.buffers, periods)]
)
has_data: np.bool_ = np.any([i.shape[0] > 0 for i in self.buffers])
if (not remainder) and period_met:
writebufs: List = [i[:j] for i, j in zip(self.buffers, periods)]
self.buffers = [i[j:] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)
elif remainder and has_data:
writebufs = [i[:j] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)
return ch_samples

def start(self, task: str, run_id: str):
"""Begin recording a run
Expand Down Expand Up @@ -204,6 +247,8 @@ def _initialize_edf_file(self) -> None:
self.writer.setPhysicalDimension(i, self.physical_dimension)
self.writer.setPhysicalMaximum(i, self.physical_lim[0])
self.writer.setPhysicalMinimum(i, self.physical_lim[1])
self.writer.setSamplefrequency(i, self.sfreq)
self.writer.setSamplefrequency(
i, self.sfreqs[0] if len(self.sfreqs) == 1 else self.sfreqs[i]
)
if "AUX" not in el:
self.writer.setPrefilter(i, self.preamp_filter)
4 changes: 2 additions & 2 deletions libbids/instruments/instrument.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Instrument for stimulating or recording"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, TYPE_CHECKING, Union, cast
from typing import List, Optional, TYPE_CHECKING, Union, cast

from ..enums import Modality

Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
self.task_id: str = ""
self.run_id: str = ""
self._started: bool = False
self.sfreq: int
self.sfreqs: List[int]

@abstractmethod
def start(self, task_id: str, run_id: str):
Expand Down
3 changes: 2 additions & 1 deletion libbids/instruments/read_instrument.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
from abc import abstractmethod
from typing import List, Union

from .instrument import Instrument

Expand All @@ -14,7 +15,7 @@ def flush(self) -> None:
raise Exception("Method not implemented")

@abstractmethod
def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data

Parameters
Expand Down
2 changes: 1 addition & 1 deletion libbids/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def prefix(self) -> str:

@property
def sfreq(self) -> int:
return self.task.primary_instrument.sfreq
return self.task.primary_instrument.sfreqs[0]

@property
def subject_dir(self) -> Path:
Expand Down
Loading