diff --git a/libbids/instruments/__init__.py b/libbids/instruments/__init__.py index 2581466..d322d21 100644 --- a/libbids/instruments/__init__.py +++ b/libbids/instruments/__init__.py @@ -1,11 +1,13 @@ -from .instrument import Instrument from .eeg_instrument import EEGInstrument +from .ieeg_instrument import IEEGInstrument +from .instrument import Instrument from .physio_instrument import PhysioInstrument from .read_instrument import ReadInstrument from .stim_instrument import StimInstrument from .write_instrument import WriteInstrument EEGInstrument +IEEGInstrument Instrument PhysioInstrument ReadInstrument diff --git a/libbids/instruments/eeg_instrument.py b/libbids/instruments/eeg_instrument.py index 1a0146d..e2884ad 100644 --- a/libbids/instruments/eeg_instrument.py +++ b/libbids/instruments/eeg_instrument.py @@ -27,7 +27,7 @@ def __init__( self, session: "Session", device: Any, - sfreq: int, + sfreq: Union[int, List[int]], electrodes: List[str], physical_dimension: str = "uV", physical_lim: Tuple = (-1000.0, 1000.0), @@ -35,6 +35,7 @@ def __init__( record_duration: float = 1.0, init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None, read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None, + is_digital: bool = False, **kwargs ): """Initialize a device for collecting electroecephalograms @@ -45,8 +46,10 @@ def __init__( Session currently using this instrument device : Any The a class that respresents a device for eeg data collection - sfreq : int - The device's sampling frequency + sfreq : Union[int, List[int]] + The device's sampling frequency. All channels/electrodes assume the + same sampling rate if a signle integer is provided, else a list of + sampling rates must be provided for each channel/electrode electrodes: List[str] A list of electrode names associated with the device physical_dimension : str @@ -66,14 +69,20 @@ def __init__( callable, the function is simply called read_fn : Union[Tuple[str, Dict], Callable] Similar to `init_read_fn`, but used for sampling data from the device + is_digital : bool + Whether the data recorded from the device is in a digital format or + a physical floating point integer (e.g., µV) kwargs : Dict This keyword arguments dictionary is used to supply detailes to the edf file header. """ super(EEGInstrument, self).__init__(session, Modality.EEG, file_ext="edf") - self.sfreq: int = sfreq + self.sfreqs: List[int] = sfreq if isinstance(sfreq, List) else [sfreq] assert len(electrodes) > 1, "Must supply electrodes" + assert len(self.sfreqs) == 1 or len(self.sfreqs) == len( + electrodes + ), "Must supply same number of sampling rates as electrodes" self.device: Any = device self.electrodes: List[str] = electrodes self.physical_dimension: str = physical_dimension @@ -82,9 +91,11 @@ def __init__( self.record_duration: float = record_duration self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn + self.is_digital: bool = is_digital self.modality_path.mkdir(exist_ok=True) self.metadata: Dict = self._fixup_edf_metadata(kwargs) self.buffer: np.ndarray + self.buffers: List[np.ndarray] = [np.array([]) for i in range(len(self.sfreqs))] def annotate(self, onset: float, duration: float, description: str): assert self.writer.writeAnnotation(onset, duration, description) == 0 @@ -93,12 +104,20 @@ def device_init_read(self): """Initializes reading on the device""" if isinstance(self.init_read_fn, Callable): self.init_read_fn() + else: + fn, args, kwargs = cast(Tuple, self.init_read_fn) + self.device.__getattribute__(fn)(*args, **kwargs) - fn, args, kwargs = cast(Tuple, self.init_read_fn) - self.device.__getattribute__(fn)(*args, **kwargs) + def device_read(self) -> Union[np.ndarray, List]: + """read data from the device - def device_read(self) -> np.ndarray: - """read data from the device""" + Returns + ------- + np.ndarray + If all channels share the same sampling rate + List + If not all channels share the same sampling rate + """ if isinstance(self.read_fn, Callable): # type: ignore return cast(Callable, self.read_fn)() @@ -109,7 +128,7 @@ def flush(self) -> None: """Read data from the device simply to discard""" self.device_read() - def read(self, remainder: bool = False) -> np.ndarray: + def read(self, remainder: bool = False) -> Union[List, np.ndarray]: """Read data from the headset and return the data Parameters @@ -123,18 +142,42 @@ def read(self, remainder: bool = False) -> np.ndarray: np.ndarray A 2D array of data in the shape of (channels, time) """ - # samples - samples: np.ndarray = self.device_read() - self.buffer = np.c_[self.buffer, samples] - if (not remainder) and (self.buffer.shape[1] >= 256): - writebuf: np.ndarray = self.buffer[:, :256] - self.buffer = self.buffer[:, 256:] - self.writer.writeSamples(np.ascontiguousarray(writebuf)) - elif remainder and (self.buffer.shape[1] > 0): - writebuf = self.buffer[:, :256] - self.writer.writeSamples(np.ascontiguousarray(writebuf)) - - return samples + if len(self.sfreqs) == 1: + sfreq: int = self.sfreqs[0] + period: int = int(sfreq * self.record_duration) + samples: np.ndarray = cast(np.ndarray, self.device_read()) + self.buffer = np.c_[self.buffer, samples] + if (not remainder) and (self.buffer.shape[1] >= period): + writebuf: np.ndarray = self.buffer[:, :period] + self.buffer = self.buffer[:, period:] + self.writer.writeSamples( + np.ascontiguousarray(writebuf), digital=self.is_digital + ) + elif remainder and (self.buffer.shape[1] > 0): + writebuf = self.buffer[:, :period] + self.writer.writeSamples( + np.ascontiguousarray(writebuf), digital=self.is_digital + ) + return samples + else: + periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs] + ch_samples: List = cast(List, self.device_read()) + assert len(ch_samples) == len( + self.sfreqs + ), "Data must be the same length as the number sfreqs" + self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)] + period_met: np.bool_ = np.all( + [i.shape[0] >= j for i, j in zip(self.buffers, periods)] + ) + has_data: np.bool_ = np.any([i.shape[0] > 0 for i in self.buffers]) + if (not remainder) and period_met: + writebufs: List = [i[:j] for i, j in zip(self.buffers, periods)] + self.buffers = [i[j:] for i, j in zip(self.buffers, periods)] + self.writer.writeSamples(writebufs, digital=self.is_digital) + elif remainder and has_data: + writebufs = [i[:j] for i, j in zip(self.buffers, periods)] + self.writer.writeSamples(writebufs, digital=self.is_digital) + return ch_samples def start(self, task: str, run_id: str): """Begin recording a run @@ -203,6 +246,8 @@ def _initialize_edf_file(self) -> None: self.writer.setPhysicalDimension(i, self.physical_dimension) self.writer.setPhysicalMaximum(i, self.physical_lim[0]) self.writer.setPhysicalMinimum(i, self.physical_lim[1]) - self.writer.setSamplefrequency(i, self.sfreq) + self.writer.setSamplefrequency( + i, self.sfreqs[0] if len(self.sfreqs) == 1 else self.sfreqs[i] + ) if "AUX" not in el: self.writer.setPrefilter(i, self.preamp_filter) diff --git a/libbids/instruments/ieeg_instrument.py b/libbids/instruments/ieeg_instrument.py index 81362a0..1bfb92f 100644 --- a/libbids/instruments/ieeg_instrument.py +++ b/libbids/instruments/ieeg_instrument.py @@ -28,7 +28,7 @@ def __init__( self, session: "Session", device: Any, - sfreq: int, + sfreq: Union[int, List[int]], electrodes: List[str], physical_dimension: str = "uV", physical_lim: Tuple = (-1000.0, 1000.0), @@ -36,6 +36,7 @@ def __init__( record_duration: float = 1.0, init_read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None, read_fn: Union[Tuple[str, list, Dict], Callable] = lambda: None, + is_digital: bool = False, **kwargs ): """Initialize a device for collecting electroecephalograms @@ -45,9 +46,11 @@ def __init__( session : Session Session currently using this instrument device : Any - The a class that respresents a device for eeg data collection - sfreq : int - The device's sampling frequency + The a class that respresents a device for ieeg data collection + sfreq : Union[int, List[int]] + The device's sampling frequency. All channels/electrodes assume the + same sampling rate if a signle integer is provided, else a list of + sampling rates must be provided for each channel/electrode electrodes: List[str] A list of electrode names associated with the device physical_dimension : str @@ -67,14 +70,20 @@ def __init__( callable, the function is simply called read_fn : Union[Tuple[str, Dict], Callable] Similar to `init_read_fn`, but used for sampling data from the device + is_digital : bool + Whether the data recorded from the device is in a digital format or + a physical floating point integer (e.g., µV) kwargs : Dict This keyword arguments dictionary is used to supply detailes to the edf file header. """ super(IEEGInstrument, self).__init__(session, Modality.iEEG, file_ext="edf") - self.sfreq: int = sfreq + self.sfreqs: List[int] = sfreq if isinstance(sfreq, List) else [sfreq] assert len(electrodes) > 1, "Must supply electrodes" + assert len(self.sfreqs) == 1 or len(self.sfreqs) == len( + electrodes + ), "Must supply same number of sampling rates as electrodes" self.device: Any = device self.electrodes: List[str] = electrodes self.physical_dimension: str = physical_dimension @@ -83,9 +92,11 @@ def __init__( self.record_duration: float = record_duration self.init_read_fn: Union[Tuple[str, List, Dict], Callable] = init_read_fn self.read_fn: Union[Tuple[str, List, Dict], Callable] = read_fn + self.is_digital: bool = is_digital self.modality_path.mkdir(exist_ok=True) self.metadata: Dict = self._fixup_edf_metadata(kwargs) self.buffer: np.ndarray + self.buffers: List[np.ndarray] = [np.array([]) for i in range(len(self.sfreqs))] def annotate(self, onset: float, duration: float, description: str): assert self.writer.writeAnnotation(onset, duration, description) == 0 @@ -94,12 +105,20 @@ def device_init_read(self): """Initializes reading on the device""" if isinstance(self.init_read_fn, Callable): self.init_read_fn() + else: + fn, args, kwargs = cast(Tuple, self.init_read_fn) + self.device.__getattribute__(fn)(*args, **kwargs) - fn, args, kwargs = cast(Tuple, self.init_read_fn) - self.device.__getattribute__(fn)(*args, **kwargs) + def device_read(self) -> Union[np.ndarray, List]: + """read data from the device - def device_read(self) -> np.ndarray: - """read data from the device""" + Returns + ------- + np.ndarray + If all channels share the same sampling rate + List + If not all channels share the same sampling rate + """ if isinstance(self.read_fn, Callable): # type: ignore return cast(Callable, self.read_fn)() @@ -110,7 +129,7 @@ def flush(self) -> None: """Read data from the device simply to discard""" self.device_read() - def read(self, remainder: bool = False) -> np.ndarray: + def read(self, remainder: bool = False) -> Union[List, np.ndarray]: """Read data from the headset and return the data Parameters @@ -124,18 +143,42 @@ def read(self, remainder: bool = False) -> np.ndarray: np.ndarray A 2D array of data in the shape of (channels, time) """ - # samples - samples: np.ndarray = self.device_read() - self.buffer = np.c_[self.buffer, samples] - if (not remainder) and (self.buffer.shape[1] >= 256): - writebuf: np.ndarray = self.buffer[:, :256] - self.buffer = self.buffer[:, 256:] - self.writer.writeSamples(np.ascontiguousarray(writebuf)) - elif remainder and (self.buffer.shape[1] > 0): - writebuf = self.buffer[:, :256] - self.writer.writeSamples(np.ascontiguousarray(writebuf)) - - return samples + if len(self.sfreqs) == 1: + sfreq: int = self.sfreqs[0] + period: int = int(sfreq * self.record_duration) + samples: np.ndarray = cast(np.ndarray, self.device_read()) + self.buffer = np.c_[self.buffer, samples] + if (not remainder) and (self.buffer.shape[1] >= period): + writebuf: np.ndarray = self.buffer[:, :period] + self.buffer = self.buffer[:, period:] + self.writer.writeSamples( + np.ascontiguousarray(writebuf), digital=self.is_digital + ) + elif remainder and (self.buffer.shape[1] > 0): + writebuf = self.buffer[:, :period] + self.writer.writeSamples( + np.ascontiguousarray(writebuf), digital=self.is_digital + ) + return samples + else: + periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs] + ch_samples: List = cast(List, self.device_read()) + assert len(ch_samples) == len( + self.sfreqs + ), "Data must be the same length as the number sfreqs" + self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)] + period_met: np.bool_ = np.all( + [i.shape[0] >= j for i, j in zip(self.buffers, periods)] + ) + has_data: np.bool_ = np.any([i.shape[0] > 0 for i in self.buffers]) + if (not remainder) and period_met: + writebufs: List = [i[:j] for i, j in zip(self.buffers, periods)] + self.buffers = [i[j:] for i, j in zip(self.buffers, periods)] + self.writer.writeSamples(writebufs, digital=self.is_digital) + elif remainder and has_data: + writebufs = [i[:j] for i, j in zip(self.buffers, periods)] + self.writer.writeSamples(writebufs, digital=self.is_digital) + return ch_samples def start(self, task: str, run_id: str): """Begin recording a run @@ -204,6 +247,8 @@ def _initialize_edf_file(self) -> None: self.writer.setPhysicalDimension(i, self.physical_dimension) self.writer.setPhysicalMaximum(i, self.physical_lim[0]) self.writer.setPhysicalMinimum(i, self.physical_lim[1]) - self.writer.setSamplefrequency(i, self.sfreq) + self.writer.setSamplefrequency( + i, self.sfreqs[0] if len(self.sfreqs) == 1 else self.sfreqs[i] + ) if "AUX" not in el: self.writer.setPrefilter(i, self.preamp_filter) diff --git a/libbids/instruments/instrument.py b/libbids/instruments/instrument.py index 5710748..83e9afc 100644 --- a/libbids/instruments/instrument.py +++ b/libbids/instruments/instrument.py @@ -1,7 +1,7 @@ """Instrument for stimulating or recording""" from abc import ABC, abstractmethod from pathlib import Path -from typing import Optional, TYPE_CHECKING, Union, cast +from typing import List, Optional, TYPE_CHECKING, Union, cast from ..enums import Modality @@ -60,7 +60,7 @@ def __init__( self.task_id: str = "" self.run_id: str = "" self._started: bool = False - self.sfreq: int + self.sfreqs: List[int] @abstractmethod def start(self, task_id: str, run_id: str): diff --git a/libbids/instruments/read_instrument.py b/libbids/instruments/read_instrument.py index 751fc1f..e4b1bc3 100644 --- a/libbids/instruments/read_instrument.py +++ b/libbids/instruments/read_instrument.py @@ -1,5 +1,6 @@ import numpy as np from abc import abstractmethod +from typing import List, Union from .instrument import Instrument @@ -14,7 +15,7 @@ def flush(self) -> None: raise Exception("Method not implemented") @abstractmethod - def read(self, remainder: bool = False) -> np.ndarray: + def read(self, remainder: bool = False) -> Union[List, np.ndarray]: """Read data from the headset and return the data Parameters diff --git a/libbids/run.py b/libbids/run.py index b34ab23..b23f177 100644 --- a/libbids/run.py +++ b/libbids/run.py @@ -171,7 +171,7 @@ def prefix(self) -> str: @property def sfreq(self) -> int: - return self.task.primary_instrument.sfreq + return self.task.primary_instrument.sfreqs[0] @property def subject_dir(self) -> Path: