Skip to content

Commit

Permalink
Final edits for different sfreq types
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincar committed Jan 25, 2024
1 parent 3a3d706 commit 19a2c84
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 20 deletions.
16 changes: 8 additions & 8 deletions libbids/instruments/eeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ def device_init_read(self):
"""Initializes reading on the device"""
if isinstance(self.init_read_fn, Callable):
self.init_read_fn()

fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)
else:
fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)

def device_read(self) -> Union[np.ndarray, List]:
"""read data from the device
Expand All @@ -128,7 +128,7 @@ def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()

def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data
Parameters
Expand Down Expand Up @@ -158,13 +158,14 @@ def read(self, remainder: bool = False) -> np.ndarray:
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
return samples
else:
periods: List = [int(f) * self.record_duration for f in self.sfreqs]
periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs]
ch_samples: List = cast(List, self.device_read())
assert len(ch_samples) == len(
self.sfreqs
), "Data must be the same length as the number sfreqs"
self.buffers = [np.c_[i, j] for i, j in zip(self.buffers, ch_samples)]
self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)]
period_met: np.bool_ = np.all(
[i.shape[0] >= j for i, j in zip(self.buffers, periods)]
)
Expand All @@ -176,8 +177,7 @@ def read(self, remainder: bool = False) -> np.ndarray:
elif remainder and has_data:
writebufs = [i[:j] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)

return samples
return ch_samples

def start(self, task: str, run_id: str):
"""Begin recording a run
Expand Down
16 changes: 8 additions & 8 deletions libbids/instruments/ieeg_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def device_init_read(self):
"""Initializes reading on the device"""
if isinstance(self.init_read_fn, Callable):
self.init_read_fn()

fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)
else:
fn, args, kwargs = cast(Tuple, self.init_read_fn)
self.device.__getattribute__(fn)(*args, **kwargs)

def device_read(self) -> Union[np.ndarray, List]:
"""read data from the device
Expand All @@ -129,7 +129,7 @@ def flush(self) -> None:
"""Read data from the device simply to discard"""
self.device_read()

def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data
Parameters
Expand Down Expand Up @@ -159,13 +159,14 @@ def read(self, remainder: bool = False) -> np.ndarray:
self.writer.writeSamples(
np.ascontiguousarray(writebuf), digital=self.is_digital
)
return samples
else:
periods: List = [int(f) * self.record_duration for f in self.sfreqs]
periods: List[int] = [int(f * self.record_duration) for f in self.sfreqs]
ch_samples: List = cast(List, self.device_read())
assert len(ch_samples) == len(
self.sfreqs
), "Data must be the same length as the number sfreqs"
self.buffers = [np.c_[i, j] for i, j in zip(self.buffers, ch_samples)]
self.buffers = [np.r_[i, j] for i, j in zip(self.buffers, ch_samples)]
period_met: np.bool_ = np.all(
[i.shape[0] >= j for i, j in zip(self.buffers, periods)]
)
Expand All @@ -177,8 +178,7 @@ def read(self, remainder: bool = False) -> np.ndarray:
elif remainder and has_data:
writebufs = [i[:j] for i, j in zip(self.buffers, periods)]
self.writer.writeSamples(writebufs, digital=self.is_digital)

return samples
return ch_samples

def start(self, task: str, run_id: str):
"""Begin recording a run
Expand Down
4 changes: 2 additions & 2 deletions libbids/instruments/instrument.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Instrument for stimulating or recording"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, TYPE_CHECKING, Union, cast
from typing import List, Optional, TYPE_CHECKING, Union, cast

from ..enums import Modality

Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
self.task_id: str = ""
self.run_id: str = ""
self._started: bool = False
self.sfreq: int
self.sfreqs: List[int]

@abstractmethod
def start(self, task_id: str, run_id: str):
Expand Down
3 changes: 2 additions & 1 deletion libbids/instruments/read_instrument.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
from abc import abstractmethod
from typing import List, Union

from .instrument import Instrument

Expand All @@ -14,7 +15,7 @@ def flush(self) -> None:
raise Exception("Method not implemented")

@abstractmethod
def read(self, remainder: bool = False) -> np.ndarray:
def read(self, remainder: bool = False) -> Union[List, np.ndarray]:
"""Read data from the headset and return the data
Parameters
Expand Down
2 changes: 1 addition & 1 deletion libbids/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def prefix(self) -> str:

@property
def sfreq(self) -> int:
return self.task.primary_instrument.sfreq
return self.task.primary_instrument.sfreqs[0]

@property
def subject_dir(self) -> Path:
Expand Down

0 comments on commit 19a2c84

Please sign in to comment.