diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml
index 462a555a..41b70c44 100644
--- a/.github/workflows/testing.yml
+++ b/.github/workflows/testing.yml
@@ -43,10 +43,13 @@ jobs:
- name: Test with pytest
run: |
# run tests separately here due to Segmentation Fault in test_clustering when run all in
- # one command with `pytest` on MacOS. Bugs not catched, so this is a trade-off to avoid SF.
- python -m pytest -rA pypots/tests/test_classification.py -n auto --cov=pypots
- python -m pytest -rA pypots/tests/test_imputation.py -n auto --cov=pypots --cov-append
- python -m pytest -rA pypots/tests/test_clustering.py -n auto --cov=pypots --cov-append
+ # one command with `pytest` on MacOS. Bugs not caught, so this is a trade-off to avoid SF.
+ python -m pytest -rA pypots/tests/test_classification.py -n auto --cov=pypots --dist=loadgroup
+ python -m pytest -rA pypots/tests/test_imputation.py -n auto --cov=pypots --cov-append --dist=loadgroup
+ python -m pytest -rA pypots/tests/test_clustering.py -n auto --cov=pypots --cov-append --dist=loadgroup
+ python -m pytest -rA pypots/tests/test_forecasting.py -n auto --cov=pypots --cov-append --dist=loadgroup
+ python -m pytest -rA pypots/tests/test_data.py -n auto --cov=pypots --cov-append --dist=loadgroup
+ python -m pytest -rA pypots/tests/test_logging.py -n auto --cov=pypots --cov-append --dist=loadgroup
- name: Generate the LCOV report
run: |
diff --git a/CITATION.cff b/CITATION.cff
index 49eed6c0..64753889 100644
--- a/CITATION.cff
+++ b/CITATION.cff
@@ -5,7 +5,7 @@ authors:
given-names: "Wenjie"
orcid: "https://orcid.org/0000-0003-3046-7835"
title: "PyPOTS: A Python Toolbox for Data Mining on Partially-Observed Time Series"
-version: 0.0.7
-doi: 10.5281/zenodo.6823222
+version: 0.0.9
+doi: 10.5281/zenodo.6823221
date-released: 2022-07-12
url: "https://github.com/WenjieDu/PyPOTS"
\ No newline at end of file
diff --git a/README.md b/README.md
index 686b3042..615570c3 100644
--- a/README.md
+++ b/README.md
@@ -5,44 +5,46 @@
-
+
-
+
-
+
-
+
-
+
-
-
-
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
-
+
-
-
-
-
-
-
-
+
+
+
-
-
-
+
+
+
@@ -64,6 +66,9 @@ Install it with `conda install pypots`, you may need to specify the channel with
Install the latest release from PyPI:
> pip install pypots
+or install from the source code with the latest features not officially released in a version:
+> pip install `https://github.com/WenjieDu/PyPOTS/archive/main.zip`
+
Below is an example applying SAITS in PyPOTS to impute missing values in the dataset PhysioNet2012:
@@ -82,10 +87,11 @@ X = StandardScaler().fit_transform(X.to_numpy())
X = X.reshape(num_samples, 48, -1)
X_intact, X, missing_mask, indicating_mask = mcar(X, 0.1) # hold out 10% observed values as ground truth
X = masked_fill(X, 1 - missing_mask, np.nan)
+dataset = {"X": X}
# Model training. This is PyPOTS showtime. 💪
saits = SAITS(n_steps=48, n_features=37, n_layers=2, d_model=256, d_inner=128, n_head=4, d_k=64, d_v=64, dropout=0.1, epochs=10)
-saits.fit(X) # train the model. Here I use the whole dataset as the training set, because ground truth is not visible to the model.
-imputation = saits.impute(X) # impute the originally-missing values and artificially-missing values
+saits.fit(dataset) # train the model. Here I use the whole dataset as the training set, because ground truth is not visible to the model.
+imputation = saits.impute(dataset) # impute the originally-missing values and artificially-missing values
mae = cal_mae(imputation, X_intact, indicating_mask) # calculate mean absolute error on the ground truth (artificially-missing values)
```
@@ -112,13 +118,13 @@ author = {Wenjie Du},
title = {{PyPOTS: A Python Toolbox for Data Mining on Partially-Observed Time Series}},
howpublished = {\url{https://github.com/wenjiedu/pypots}},
year = {2022},
-doi = {10.5281/zenodo.6823222},
+doi = {10.5281/zenodo.6823221},
}
```
or
-`Wenjie Du. (2022). PyPOTS: A Python Toolbox for Data Mining on Partially-Observed Time Series. Zenodo. https://doi.org/10.5281/zenodo.6823222`
+`Wenjie Du. (2022). PyPOTS: A Python Toolbox for Data Mining on Partially-Observed Time Series. Zenodo. https://doi.org/10.5281/zenodo.6823221`
## ❖ Attention 👀
The documentation and tutorials are under construction. And a short paper introducing PyPOTS is on the way! 🚀 Stay tuned please!
diff --git a/environment.yml b/environment.yml
index 396b79b2..c1cb2024 100644
--- a/environment.yml
+++ b/environment.yml
@@ -9,10 +9,9 @@ dependencies:
- conda-forge::scipy
- conda-forge::pandas
- conda-forge::scikit-learn
- - conda-forge::matplotlib
- conda-forge::tensorboard
- conda-forge::pip
- - pytorch::pytorch==1.11.0
- - pip:
- - pycorruptor==0.0.4
- - tsdb==0.0.7
+ - conda-forge::pycorruptor
+ - conda-forge::tsdb
+ - conda-forge::h5py
+ - pytorch::pytorch==1.11.0
\ No newline at end of file
diff --git a/pypots/__version__.py b/pypots/__version__.py
index b4069ba5..c6345fc4 100644
--- a/pypots/__version__.py
+++ b/pypots/__version__.py
@@ -21,4 +21,4 @@
# Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer.
# 'X.Y.dev0' is the canonical version of 'X.Y.dev'
-version = "0.0.9"
+version = "0.0.10"
diff --git a/pypots/base.py b/pypots/base.py
index 49b1b0c2..0f2e69e4 100644
--- a/pypots/base.py
+++ b/pypots/base.py
@@ -8,7 +8,6 @@
import os
from abc import ABC
-import numpy as np
import torch
from pypots.utils.files import create_dir_if_not_exist
@@ -32,101 +31,6 @@ def __init__(self, device):
else:
self.device = device
- def check_input(
- self, expected_n_steps, expected_n_features, X, y=None, out_dtype="tensor"
- ):
- """Check value type and shape of input X and y
-
- Parameters
- ----------
- expected_n_steps : int
- Number of time steps of input time series (X) that the model expects.
- This value is the same with the argument `n_steps` used to initialize the model.
-
- expected_n_features : int
- Number of feature dimensions of input time series (X) that the model expects.
- This value is the same with the argument `n_features` used to initialize the model.
-
- X : array-like,
- Time-series data that must have a shape like [n_samples, expected_n_steps, expected_n_features].
-
- y : array-like, default=None
- Labels of time-series samples (X) that must have a shape like [n_samples] or [n_samples, n_classes].
-
- out_dtype : str, in ['tensor', 'ndarray'], default='tensor'
- Data type of the output, should be np.ndarray or torch.Tensor
-
- Returns
- -------
- X : tensor
-
- y : tensor
- """
- assert out_dtype in [
- "tensor",
- "ndarray",
- ], f'out_dtype should be "tensor" or "ndarray", but got {out_dtype}'
- is_list = isinstance(X, list)
- is_array = isinstance(X, np.ndarray)
- is_tensor = isinstance(X, torch.Tensor)
- assert is_tensor or is_array or is_list, TypeError(
- "X should be an instance of list/np.ndarray/torch.Tensor, "
- f"but got {type(X)}"
- )
-
- # convert the data type if in need
- if out_dtype == "tensor":
- if is_list:
- X = torch.tensor(X).to(self.device)
- elif is_array:
- X = torch.from_numpy(X).to(self.device)
- else: # is tensor
- X = X.to(self.device)
- else: # out_dtype is ndarray
- # convert to np.ndarray first for shape check
- if is_list:
- X = np.asarray(X)
- elif is_tensor:
- X = X.numpy()
- else: # is ndarray
- pass
-
- # check the shape of X here
- X_shape = X.shape
- assert len(X_shape) == 3, (
- f"input should have 3 dimensions [n_samples, seq_len, n_features],"
- f"but got shape={X.shape}"
- )
- assert (
- X_shape[1] == expected_n_steps
- ), f"expect X.shape[1] to be {expected_n_steps}, but got {X_shape[1]}"
- assert (
- X_shape[2] == expected_n_features
- ), f"expect X.shape[2] to be {expected_n_features}, but got {X_shape[2]}"
-
- if y is not None:
- assert len(X) == len(y), (
- f"lengths of X and y must match, " f"but got f{len(X)} and {len(y)}"
- )
- if isinstance(y, torch.Tensor):
- y = y.to(self.device) if out_dtype == "tensor" else y.numpy()
- elif isinstance(y, list):
- y = (
- torch.tensor(y).to(self.device)
- if out_dtype == "tensor"
- else np.asarray(y)
- )
- elif isinstance(y, np.ndarray):
- y = torch.from_numpy(y).to(self.device) if out_dtype == "tensor" else y
- else:
- raise TypeError(
- "y should be an instance of list/np.ndarray/torch.Tensor, "
- f"but got {type(y)}"
- )
- return X, y
- else:
- return X
-
def save_logs_to_tensorboard(self, saving_path):
"""Save logs (self.logger) into a tensorboard file.
diff --git a/pypots/classification/base.py b/pypots/classification/base.py
index 598902aa..27dcac5a 100644
--- a/pypots/classification/base.py
+++ b/pypots/classification/base.py
@@ -22,19 +22,31 @@ def __init__(self, device):
super().__init__(device)
@abstractmethod
- def fit(self, train_X, train_y, val_X=None, val_y=None):
- """Train the classifier.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the classifier on the given data.
Parameters
----------
- train_X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data for training, can contain missing values.
- train_y : array,
- Classification labels for training.
- val_X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data for validation, can contain missing values.
- val_y : array,
- Classification labels for validation.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
@@ -44,18 +56,22 @@ def fit(self, train_X, train_y, val_X=None, val_y=None):
return self
@abstractmethod
- def classify(self, X):
- """Classify the input with the trained model.
+ def classify(self, X, file_type="h5py"):
+ """Classify the input data with the trained model.
Parameters
----------
- X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data contains missing values.
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
Returns
-------
- array-like, shape [n_samples, sequence length (time steps), n_features],
- Classification results.
+ array-like, shape [n_samples],
+ Classification results of the given samples.
"""
pass
diff --git a/pypots/classification/brits.py b/pypots/classification/brits.py
index 5ef03860..6cd9a959 100644
--- a/pypots/classification/brits.py
+++ b/pypots/classification/brits.py
@@ -123,8 +123,6 @@ class BRITS(BaseNNClassifier):
The underlying BRITS model.
optimizer : object,
The optimizer for model training.
- data_loader : object,
- The data loader for dataset loading.
Parameters
----------
@@ -181,37 +179,47 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, train_y, val_X=None, val_y=None):
- """Fit the model on the given training data.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the classifier on the given data.
Parameters
----------
- train_X : array, shape [n_samples, sequence length (time steps), n_features],
- Time-series vectors.
- train_y : array,
- Classification labels.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
- Trained model.
+ Trained classifier.
"""
- train_X, train_y = self.check_input(
- self.n_steps, self.n_features, train_X, train_y
- )
- val_X, val_y = self.check_input(self.n_steps, self.n_features, val_X, val_y)
- training_set = DatasetForBRITS(
- train_X, train_y
- ) # time_gaps is necessary for BRITS
+ training_set = DatasetForBRITS(train_set)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
- val_set = DatasetForBRITS(val_X, val_y)
+ val_set = DatasetForBRITS(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
self._train_model(training_loader, val_loader)
@@ -325,10 +333,25 @@ def assemble_input_for_testing(self, data) -> dict:
}
return inputs
- def classify(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def classify(self, X, file_type="h5py"):
+ """Classify the input data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples],
+ Classification results of the given samples.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = DatasetForBRITS(X)
+ test_set = DatasetForBRITS(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
prediction_collector = []
diff --git a/pypots/classification/grud.py b/pypots/classification/grud.py
index 69929dcc..fb13df4f 100644
--- a/pypots/classification/grud.py
+++ b/pypots/classification/grud.py
@@ -145,35 +145,47 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, train_y, val_X=None, val_y=None):
- """Fit the model on the given training data.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the classifier on the given data.
Parameters
----------
- train_X : array, shape [n_samples, sequence length (time steps), n_features],
- Time-series vectors.
- train_y : array,
- Classification labels.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
- Trained model.
+ Trained classifier.
"""
- train_X, train_y = self.check_input(
- self.n_steps, self.n_features, train_X, train_y
- )
- val_X, val_y = self.check_input(self.n_steps, self.n_features, val_X, val_y)
- training_set = DatasetForGRUD(train_X, train_y)
+ training_set = DatasetForGRUD(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
- val_set = DatasetForGRUD(val_X, val_y)
+ val_set = DatasetForGRUD(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
self._train_model(training_loader, val_loader)
@@ -259,10 +271,25 @@ def assemble_input_for_testing(self, data) -> dict:
return inputs
- def classify(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def classify(self, X, file_type="h5py"):
+ """Classify the input data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples],
+ Classification results of the given samples.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = DatasetForGRUD(X)
+ test_set = DatasetForGRUD(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
prediction_collector = []
diff --git a/pypots/classification/raindrop.py b/pypots/classification/raindrop.py
index c6204bc5..31220608 100644
--- a/pypots/classification/raindrop.py
+++ b/pypots/classification/raindrop.py
@@ -666,35 +666,47 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, train_y, val_X=None, val_y=None):
+ def fit(self, train_set, val_set=None, file_type="h5py"):
"""Fit the model on the given training data.
Parameters
----------
- train_X : array, shape [n_samples, sequence length (time steps), n_features],
- Time-series vectors.
- train_y : array,
- Classification labels.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
Trained model.
"""
- train_X, train_y = self.check_input(
- self.n_steps, self.n_features, train_X, train_y
- )
- val_X, val_y = self.check_input(self.n_steps, self.n_features, val_X, val_y)
- training_set = DatasetForGRUD(train_X, train_y)
+ training_set = DatasetForGRUD(train_set)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
- val_set = DatasetForGRUD(val_X, val_y)
+ val_set = DatasetForGRUD(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
self._train_model(training_loader, val_loader)
@@ -788,10 +800,25 @@ def assemble_input_for_testing(self, data) -> dict:
return inputs
- def classify(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def classify(self, X, file_type="h5py"):
+ """Classify the input data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples],
+ Classification results of the given samples.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = DatasetForGRUD(X)
+ test_set = DatasetForGRUD(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
prediction_collector = []
diff --git a/pypots/clustering/base.py b/pypots/clustering/base.py
index f3cc8c2e..8b66eb35 100644
--- a/pypots/clustering/base.py
+++ b/pypots/clustering/base.py
@@ -22,13 +22,21 @@ def __init__(self, device):
super().__init__(device)
@abstractmethod
- def fit(self, train_X):
+ def fit(self, train_set, file_type="h5py"):
"""Train the cluster.
Parameters
----------
- train_X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data for training, can contain missing values.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set is a path string.
Returns
-------
@@ -38,17 +46,21 @@ def fit(self, train_X):
return self
@abstractmethod
- def cluster(self, X):
+ def cluster(self, X, file_type="h5py"):
"""Cluster the input with the trained model.
Parameters
----------
- X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data contains missing values.
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py"
+ The type of the given file if X is a path string.
Returns
-------
- array-like, shape [n_samples, sequence length (time steps), n_features],
+ array-like, shape [n_samples],
Clustering results.
"""
pass
diff --git a/pypots/clustering/crli.py b/pypots/clustering/crli.py
index b0bd9723..b062fc33 100644
--- a/pypots/clustering/crli.py
+++ b/pypots/clustering/crli.py
@@ -352,9 +352,28 @@ def __init__(
self._print_model_size()
self.logger = {"training_loss_generator": [], "training_loss_discriminator": []}
- def fit(self, train_X):
- train_X = self.check_input(self.n_steps, self.n_features, train_X)
- training_set = DatasetForGRUD(train_X)
+ def fit(self, train_set, file_type="h5py"):
+ """Train the cluster.
+
+ Parameters
+ ----------
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set is a path string.
+
+ Returns
+ -------
+ self : object,
+ Trained classifier.
+ """
+ training_set = DatasetForGRUD(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
@@ -515,10 +534,25 @@ def _train_model(self, training_loader, val_loader=None):
logger.info("Finished training.")
- def cluster(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def cluster(self, X, file_type="h5py"):
+ """Cluster the input with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py"
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples],
+ Clustering results.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = DatasetForGRUD(X)
+ test_set = DatasetForGRUD(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
latent_collector = []
diff --git a/pypots/clustering/vader.py b/pypots/clustering/vader.py
index 14f682fe..9a7a0e1f 100644
--- a/pypots/clustering/vader.py
+++ b/pypots/clustering/vader.py
@@ -103,7 +103,7 @@ def set_values(self, mu, var, phi):
assert phi.shape == self.phi_c_unscaled.shape
self.mu_c_unscaled = torch.nn.Parameter(mu)
self.var_c_unscaled = torch.nn.Parameter(var)
- self.phi_c_unscaled = torch.tensor(phi)
+ self.phi_c_unscaled = phi
def forward(self):
mu_c = self.mu_c_unscaled
@@ -293,6 +293,7 @@ def forward(self, inputs, pretrain=False):
ii, jj = torch.meshgrid(
torch.arange(self.n_clusters, dtype=torch.int64, device=device),
torch.arange(batch_size, dtype=torch.int64, device=device),
+ indexing="ij",
)
ii = ii.flatten()
jj = jj.flatten()
@@ -378,9 +379,28 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X):
- train_X = self.check_input(self.n_steps, self.n_features, train_X)
- training_set = DatasetForGRUD(train_X)
+ def fit(self, train_set, file_type="h5py"):
+ """Train the cluster.
+
+ Parameters
+ ----------
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set is a path string.
+
+ Returns
+ -------
+ self : object,
+ Trained classifier.
+ """
+ training_set = DatasetForGRUD(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
@@ -557,10 +577,25 @@ def _train_model(self, training_loader, val_loader=None):
logger.info("Finished training.")
- def cluster(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def cluster(self, X, file_type="h5py"):
+ """Cluster the input with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py"
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples],
+ Clustering results.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = DatasetForGRUD(X)
+ test_set = DatasetForGRUD(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
clustering_results_collector = []
diff --git a/pypots/data/base.py b/pypots/data/base.py
index 827b5d93..f0303839 100644
--- a/pypots/data/base.py
+++ b/pypots/data/base.py
@@ -5,46 +5,180 @@
# Created by Wenjie Du
# License: GPL-v3
-from torch.utils.data import Dataset
+from abc import abstractmethod
+
+import numpy as np
import torch
+from torch.utils.data import Dataset
+
+# Currently we only support h5 files
+SUPPORTED_DATASET_FILE_TYPE = ["h5py"]
class BaseDataset(Dataset):
"""Base dataset class in PyPOTS.
- Parameters
- ----------
- X : tensor, shape of [n_samples, n_steps, n_features]
- Time-series feature vector.
+ data : dict or str,
+ The dataset for model input, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for input, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
- y : tensor, shape of [n_samples], optional, default=None,
- Classification labels of according time-series samples.
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
"""
- def __init__(self, X, y=None):
+ def __init__(self, data, file_type="h5py"):
super().__init__()
# types and shapes had been checked after X and y input into the model
# So they are safe to use here. No need to check again.
- self.X = X
- self.y = y
- self.n_steps = self.X.shape[1]
- self.n_features = self.X.shape[2]
+
+ self.data = data
+ if isinstance(self.data, str): # data from file
+ self.file_type = file_type
+
+ # check if the given file type is supported
+ assert (
+ file_type in SUPPORTED_DATASET_FILE_TYPE
+ ), f"file_type should be one of {SUPPORTED_DATASET_FILE_TYPE}, but got {file_type}"
+
+ # open the file handle
+ self.file_handle = self._open_file_handle()
+ # check if X exists in the file
+ assert (
+ "X" in self.file_handle.keys()
+ ), "The given dataset file doesn't contains X. Please double check."
+
+ else: # data from array
+ X = data["X"]
+ y = None if "y" not in data.keys() else data["y"]
+ self.X, self.y = self.check_input(X, y)
+
+ self.sample_num = self._get_sample_num()
+
+ # set up function fetch_data()
+ if isinstance(self.data, str):
+ self.fetch_data = self._fetch_data_from_file
+ else:
+ self.fetch_data = self._fetch_data_from_array
+
+ def _get_sample_num(self):
+ """Determine the number of samples in the dataset and return the number.
+
+ Returns
+ -------
+ sample_num : int
+ The number of the samples in the given dataset.
+ """
+ if isinstance(self.data, str):
+ if self.file_handle is None:
+ self.file_handle = self._open_file_handle()
+ sample_num = len(self.file_handle["X"])
+ else:
+ sample_num = len(self.X)
+
+ return sample_num
def __len__(self):
- return len(self.X)
+ return self.sample_num
- def __getitem__(self, idx):
- """Fetch data according to index.
+ @staticmethod
+ def check_input(X, y=None, out_dtype="tensor"):
+ """Check value type and shape of input X and y
+
+ Parameters
+ ----------
+ X : array-like,
+ Time-series data that must have a shape like [n_samples, expected_n_steps, expected_n_features].
+
+ y : array-like, default=None
+ Labels of time-series samples (X) that must have a shape like [n_samples] or [n_samples, n_classes].
+
+ out_dtype : str, in ['tensor', 'ndarray'], default='tensor'
+ Data type of the output, should be np.ndarray or torch.Tensor
+
+ Returns
+ -------
+ X : array-like
+
+ y : array-like
+ """
+ assert out_dtype in [
+ "tensor",
+ "ndarray",
+ ], f'out_dtype should be "tensor" or "ndarray", but got {out_dtype}'
+
+ is_list = isinstance(X, list)
+ is_array = isinstance(X, np.ndarray)
+ is_tensor = isinstance(X, torch.Tensor)
+ assert is_tensor or is_array or is_list, TypeError(
+ "X should be an instance of list/np.ndarray/torch.Tensor, "
+ f"but got {type(X)}"
+ )
+
+ # convert the data type if in need
+ if out_dtype == "tensor":
+ if is_list:
+ X = torch.tensor(X)
+ elif is_array:
+ X = torch.from_numpy(X)
+ else: # is tensor
+ pass
+ else: # out_dtype is ndarray
+ # convert to np.ndarray first for shape check
+ if is_list:
+ X = np.asarray(X)
+ elif is_tensor:
+ X = X.numpy()
+ else: # is ndarray
+ pass
+
+ # check the shape of X here
+ X_shape = X.shape
+ assert len(X_shape) == 3, (
+ f"input should have 3 dimensions [n_samples, seq_len, n_features],"
+ f"but got shape={X_shape}"
+ )
+
+ if y is not None:
+ assert len(X) == len(y), (
+ f"lengths of X and y must match, " f"but got f{len(X)} and {len(y)}"
+ )
+ if isinstance(y, torch.Tensor):
+ y = y if out_dtype == "tensor" else y.numpy()
+ elif isinstance(y, list):
+ y = torch.tensor(y) if out_dtype == "tensor" else np.asarray(y)
+ elif isinstance(y, np.ndarray):
+ y = torch.from_numpy(y) if out_dtype == "tensor" else y
+ else:
+ raise TypeError(
+ "y should be an instance of list/np.ndarray/torch.Tensor, "
+ f"but got {type(y)}"
+ )
+
+ return X, y
+
+ @abstractmethod
+ def _fetch_data_from_array(self, idx):
+ """Fetch data from self.X if it is given.
Parameters
----------
idx : int,
- The index to fetch the specified sample.
+ The index of the sample to be return.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
"""
+
X = self.X[idx]
missing_mask = ~torch.isnan(X)
X = torch.nan_to_num(X)
-
sample = [
torch.tensor(idx),
X.to(torch.float32),
@@ -55,3 +189,98 @@ def __getitem__(self, idx):
sample.append(self.y[idx].to(torch.long))
return sample
+
+ def _open_file_handle(self):
+ """Open the file handle for reading data from the file.
+
+ Notes
+ -----
+ This function can also help confirm if the given file and file type match.
+
+ Returns
+ -------
+ file_handle : file
+
+ """
+ data_file_path = self.data
+ try:
+ import h5py
+
+ file_handler = h5py.File(
+ data_file_path, "r", swmr=True
+ ) # set if the h5 file need to be written into new content during reading
+ except ImportError:
+ raise ImportError(
+ "h5py is missing and cannot be imported. Please install it first."
+ )
+ except OSError as e:
+ raise TypeError(
+ f"{e} This probably is caused by file type error. "
+ f"Please confirm that the given file {data_file_path} is an h5 file."
+ )
+ except Exception as e:
+ raise RuntimeError(e)
+ return file_handler
+
+ @abstractmethod
+ def _fetch_data_from_file(self, idx):
+ """Fetch data with the lazy-loading strategy, i.e. only loading data from the file while requesting for samples.
+ Here the opened file handle doesn't load the entire dataset into RAM but only load the currently accessed slice.
+
+ Notes
+ -----
+ Multi workers reading from h5 file is tricky, and I was confronted with a problem similar to
+ https://discuss.pytorch.org/t/dataloader-when-num-worker-0-there-is-bug/25643/7 in 2020, please
+ refer to it for more details about the problem.
+ The implementation here is referred to
+ https://discuss.pytorch.org/t/dataloader-when-num-worker-0-there-is-bug/25643/10
+ And according to https://discuss.pytorch.org/t/dataloader-when-num-worker-0-there-is-bug/25643/37,
+ pytorch v1.7.1 and h5py v3.2.0 work well, so probably updating to the latest version can avoid the
+ issue I met. After all, this implementation may need to be updated in the near future.
+
+ Parameters
+ ----------
+ idx : int,
+ The index of the sample to be return.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
+ """
+
+ if self.file_handle is None:
+ self.file_handle = self._open_file_handle()
+
+ X = torch.from_numpy(self.file_handle["X"][idx])
+ missing_mask = ~torch.isnan(X)
+ X = torch.nan_to_num(X)
+ sample = [
+ torch.tensor(idx),
+ X.to(torch.float32),
+ missing_mask.to(torch.float32),
+ ]
+
+ if (
+ "y" in self.file_handle.keys()
+ ): # if the dataset has labels, then fetch it from the file
+ sample.append(self.file_handle["y"][idx].to(torch.long))
+
+ return sample
+
+ def __getitem__(self, idx):
+ """Fetch data according to index.
+
+ Parameters
+ ----------
+ idx : int,
+ The index to fetch the specified sample.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
+ """
+
+ sample = self.fetch_data(idx)
+ return sample
diff --git a/pypots/data/dataset_for_brits.py b/pypots/data/dataset_for_brits.py
index 087bdba8..eb360583 100644
--- a/pypots/data/dataset_for_brits.py
+++ b/pypots/data/dataset_for_brits.py
@@ -5,6 +5,7 @@
# Created by Wenjie Du
# License: GLP-v3
+import numpy as np
import torch
from pypots.data.base import BaseDataset
@@ -15,7 +16,7 @@ def parse_delta(missing_mask):
Parameters
----------
- missing_mask : tensor, shape of [n_samples, n_steps, n_features]
+ missing_mask : tensor, shape of [n_steps, n_features] or [n_samples, n_steps, n_features]
Binary masks indicate missing values.
Returns
@@ -24,75 +25,128 @@ def parse_delta(missing_mask):
Delta matrix indicates time gaps of missing values.
Its math definition please refer to :cite:`che2018GRUD`.
"""
- # missing_mask is from X, and X's shape and type had been checked. So no need to double-check here.
- n_samples, n_steps, n_features = missing_mask.shape
- device = missing_mask.device
- delta_collector = []
- for m_mask in missing_mask:
- delta = []
+
+ def cal_delta_for_single_sample(mask):
+ d = [] # single sample's delta
for step in range(n_steps):
if step == 0:
- delta.append(torch.zeros(1, n_features, device=device))
+ d.append(torch.zeros(1, n_features, device=device))
else:
- delta.append(
- torch.ones(1, n_features, device=device)
- + (1 - m_mask[step]) * delta[-1]
+ d.append(
+ torch.ones(1, n_features, device=device) + (1 - mask[step]) * d[-1]
)
- delta = torch.concat(delta, dim=0)
- delta_collector.append(delta.unsqueeze(0))
- delta = torch.concat(delta_collector, dim=0)
+ d = torch.concat(d, dim=0)
+ return d
+
+ # missing_mask is from X, and X's shape and type had been checked. So no need to double-check here.
+ device = missing_mask.device
+ if len(missing_mask.shape) == 2:
+ n_steps, n_features = missing_mask.shape
+ delta = cal_delta_for_single_sample(missing_mask)
+ else:
+ n_samples, n_steps, n_features = missing_mask.shape
+ delta_collector = []
+ for m_mask in missing_mask:
+ delta = cal_delta_for_single_sample(m_mask)
+ delta_collector.append(delta.unsqueeze(0))
+ delta = torch.concat(delta_collector, dim=0)
+
return delta
-class DatasetForBRITS(BaseDataset):
- """Dataset class for BRITS.
+def parse_delta_np(missing_mask):
+ """Generate time-gap (delta) matrix from missing masks.
Parameters
----------
- X : tensor, shape of [n_samples, n_steps, n_features]
- Time-series data.
+ missing_mask : array, shape of [seq_len, n_features]
+ Binary masks indicate missing values.
- y : tensor, shape of [n_samples], optional, default=None,
- Classification labels of according time-series samples.
+ Returns
+ -------
+ delta, array,
+ Delta matrix indicates time gaps of missing values.
+ Its math definition please refer to :cite:`che2018MissingData`.
"""
- def __init__(self, X, y=None):
- super().__init__(X, y)
-
- # calculate all delta here.
- # Training will take too much time if we put delta calculation in __getitem__().
- forward_missing_mask = (~torch.isnan(X)).type(torch.float32)
- forward_X = torch.nan_to_num(X)
- forward_delta = parse_delta(forward_missing_mask)
- backward_X = torch.flip(forward_X, dims=[1])
- backward_missing_mask = torch.flip(forward_missing_mask, dims=[1])
- backward_delta = parse_delta(backward_missing_mask)
-
- self.data = {
- "forward": {
- "X": forward_X,
- "missing_mask": forward_missing_mask,
- "delta": forward_delta,
- },
- "backward": {
- "X": backward_X,
- "missing_mask": backward_missing_mask,
- "delta": backward_delta,
- },
- }
+ def cal_delta_for_single_sample(mask):
+ d = []
+ for step in range(seq_len):
+ if step == 0:
+ d.append(np.zeros(n_features))
+ else:
+ d.append(np.ones(n_features) + (1 - mask[step]) * d[-1])
+ d = np.asarray(d)
+ return d
+
+ if len(missing_mask.shape) == 2:
+ seq_len, n_features = missing_mask.shape
+ delta = cal_delta_for_single_sample(missing_mask)
+ else:
+ n_samples, seq_len, n_features = missing_mask.shape
+ delta_collector = []
+ for m_mask in missing_mask:
+ delta = cal_delta_for_single_sample(m_mask)
+ delta_collector.append(delta)
+ delta = np.asarray(delta_collector)
+ return delta
+
- def __getitem__(self, idx):
- """Fetch data according to index.
+class DatasetForBRITS(BaseDataset):
+ """Dataset class for BRITS.
+
+ Parameters
+ ----------
+ data : dict or str,
+ The dataset for model input, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for input, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
+ """
+
+ def __init__(self, data, file_type="h5py"):
+ super().__init__(data, file_type)
+
+ if not isinstance(self.data, str):
+ # calculate all delta here.
+ forward_missing_mask = (~torch.isnan(self.X)).type(torch.float32)
+ forward_X = torch.nan_to_num(self.X)
+ forward_delta = parse_delta(forward_missing_mask)
+ backward_X = torch.flip(forward_X, dims=[1])
+ backward_missing_mask = torch.flip(forward_missing_mask, dims=[1])
+ backward_delta = parse_delta(backward_missing_mask)
+
+ self.processed_data = {
+ "forward": {
+ "X": forward_X,
+ "missing_mask": forward_missing_mask,
+ "delta": forward_delta,
+ },
+ "backward": {
+ "X": backward_X,
+ "missing_mask": backward_missing_mask,
+ "delta": backward_delta,
+ },
+ }
+
+ def _fetch_data_from_array(self, idx):
+ """Fetch data from self.X if it is given.
Parameters
----------
idx : int,
- The index to fetch the specified sample.
+ The index of the sample to be return.
Returns
-------
- dict,
- A dict contains
+ sample : list,
+ A list contains
index : int tensor,
The index of the sample.
@@ -112,16 +166,69 @@ def __getitem__(self, idx):
sample = [
torch.tensor(idx),
# for forward
- self.data["forward"]["X"][idx].to(torch.float32),
- self.data["forward"]["missing_mask"][idx].to(torch.float32),
- self.data["forward"]["delta"][idx].to(torch.float32),
+ self.processed_data["forward"]["X"][idx].to(torch.float32),
+ self.processed_data["forward"]["missing_mask"][idx].to(torch.float32),
+ self.processed_data["forward"]["delta"][idx].to(torch.float32),
# for backward
- self.data["backward"]["X"][idx].to(torch.float32),
- self.data["backward"]["missing_mask"][idx].to(torch.float32),
- self.data["backward"]["delta"][idx].to(torch.float32),
+ self.processed_data["backward"]["X"][idx].to(torch.float32),
+ self.processed_data["backward"]["missing_mask"][idx].to(torch.float32),
+ self.processed_data["backward"]["delta"][idx].to(torch.float32),
]
if self.y is not None:
sample.append(self.y[idx].to(torch.long))
return sample
+
+ def _fetch_data_from_file(self, idx):
+ """Fetch data with the lazy-loading strategy, i.e. only loading data from the file while requesting for samples.
+ Here the opened file handle doesn't load the entire dataset into RAM but only load the currently accessed slice.
+
+ Parameters
+ ----------
+ idx : int,
+ The index of the sample to be return.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
+ """
+
+ if self.file_handle is None:
+ self.file_handle = self._open_file_handle()
+
+ X = torch.from_numpy(self.file_handle["X"][idx])
+ missing_mask = (~torch.isnan(X)).to(torch.float32)
+ X = torch.nan_to_num(X)
+
+ forward = {
+ "X": X,
+ "missing_mask": missing_mask,
+ "deltas": parse_delta(missing_mask),
+ }
+
+ backward = {
+ "X": torch.flip(forward["X"], dims=[0]),
+ "missing_mask": torch.flip(forward["missing_mask"], dims=[0]),
+ }
+ backward["deltas"] = parse_delta(backward["missing_mask"])
+
+ sample = [
+ torch.tensor(idx),
+ # for forward
+ forward["X"],
+ forward["missing_mask"],
+ forward["deltas"],
+ # for backward
+ backward["X"],
+ backward["missing_mask"],
+ backward["deltas"],
+ ]
+
+ if (
+ "y" in self.file_handle.keys()
+ ): # if the dataset has labels, then fetch it from the file
+ sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long))
+
+ return sample
diff --git a/pypots/data/dataset_for_grud.py b/pypots/data/dataset_for_grud.py
index f3dd1d80..77f4f5f1 100644
--- a/pypots/data/dataset_for_grud.py
+++ b/pypots/data/dataset_for_grud.py
@@ -18,26 +18,34 @@ class DatasetForGRUD(BaseDataset):
Parameters
----------
- X : tensor, shape of [n_samples, seq_len, n_features]
- Time-series feature vector.
-
- y : tensor, shape of [n_samples], optional, default=None,
- Classification labels of according time-series samples.
+ data : dict or str,
+ The dataset for model input, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for input, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
"""
- def __init__(self, X, y=None):
- super().__init__(X, y)
+ def __init__(self, data, file_type="h5py"):
+ super().__init__(data, file_type)
self.locf = LOCF()
- self.missing_mask = (~torch.isnan(X)).to(torch.float32)
- self.X = torch.nan_to_num(X)
- self.deltas = parse_delta(self.missing_mask)
- self.X_filledLOCF = self.locf.locf_torch(X)
- self.empirical_mean = torch.sum(
- self.missing_mask * self.X, dim=[0, 1]
- ) / torch.sum(self.missing_mask, dim=[0, 1])
-
- def __getitem__(self, idx):
+
+ if not isinstance(self.data, str): # data from array
+ self.missing_mask = (~torch.isnan(self.X)).to(torch.float32)
+ self.X_filledLOCF = self.locf.locf_torch(self.X)
+ self.X = torch.nan_to_num(self.X)
+ self.deltas = parse_delta(self.missing_mask)
+ self.empirical_mean = torch.sum(
+ self.missing_mask * self.X, dim=[0, 1]
+ ) / torch.sum(self.missing_mask, dim=[0, 1])
+
+ def _fetch_data_from_array(self, idx):
"""Fetch data according to index.
Parameters
@@ -47,8 +55,8 @@ def __getitem__(self, idx):
Returns
-------
- dict,
- A dict contains
+ sample : list,
+ A list contains
index : int tensor,
The index of the sample.
@@ -81,3 +89,46 @@ def __getitem__(self, idx):
sample.append(self.y[idx].to(torch.long))
return sample
+
+ def _fetch_data_from_file(self, idx):
+ """Fetch data with the lazy-loading strategy, i.e. only loading data from the file while requesting for samples.
+ Here the opened file handle doesn't load the entire dataset into RAM but only load the currently accessed slice.
+
+ Parameters
+ ----------
+ idx : int,
+ The index of the sample to be return.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
+ """
+
+ if self.file_handle is None:
+ self.file_handle = self._open_file_handle()
+
+ X = torch.from_numpy(self.file_handle["X"][idx])
+ missing_mask = (~torch.isnan(X)).to(torch.float32)
+ X_filledLOCF = self.locf.locf_torch(X.unsqueeze(dim=0)).squeeze()
+ X = torch.nan_to_num(X)
+ deltas = parse_delta(missing_mask)
+ empirical_mean = torch.sum(missing_mask * X, dim=[0]) / torch.sum(
+ missing_mask, dim=[0]
+ )
+
+ sample = [
+ torch.tensor(idx),
+ X,
+ X_filledLOCF,
+ missing_mask,
+ deltas,
+ empirical_mean,
+ ]
+
+ if (
+ "y" in self.file_handle.keys()
+ ): # if the dataset has labels, then fetch it from the file
+ sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long))
+
+ return sample
diff --git a/pypots/data/dataset_for_mit.py b/pypots/data/dataset_for_mit.py
index 0edd8a88..7dfc4e4c 100644
--- a/pypots/data/dataset_for_mit.py
+++ b/pypots/data/dataset_for_mit.py
@@ -18,29 +18,32 @@ class DatasetForMIT(BaseDataset):
Parameters
----------
- X : tensor, shape of [n_samples, n_steps, n_features]
- Time-series feature vector.
-
- y : tensor, shape of [n_samples], optional, default=None,
- Classification labels of according time-series samples.
+ data : dict or str,
+ The dataset for model input, should be a dictionary including keys as 'X' and 'y',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for input, can contain missing values, and y should be array-like of shape
+ [n_samples], which is classification labels of X.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include keys as 'X' and 'y'.
+
+ file_type : str, default = "h5py"
+ The type of the given file if train_set and val_set are path strings.
rate : float, in (0,1),
Artificially missing rate, rate of the observed values which will be artificially masked as missing.
-
- Note that,
- `rate` = (number of artificially missing values) / np.sum(~np.isnan(self.data)),
+ Note that, `rate` = (number of artificially missing values) / np.sum(~np.isnan(self.data)),
not (number of artificially missing values) / np.product(self.data.shape),
considering that the given data may already contain missing values,
the latter way may be confusing because if the original missing rate >= `rate`,
the function will do nothing, i.e. it won't play the role it has to be.
-
"""
- def __init__(self, X, y=None, rate=0.2):
- super().__init__(X, y)
+ def __init__(self, data, file_type="h5py", rate=0.2):
+ super().__init__(data, file_type)
self.rate = rate
- def __getitem__(self, idx):
+ def _fetch_data_from_array(self, idx):
"""Fetch data according to index.
Parameters
@@ -50,8 +53,8 @@ def __getitem__(self, idx):
Returns
-------
- dict,
- A dict contains
+ sample : list,
+ A list contains
index : int tensor,
The index of the sample.
@@ -83,3 +86,39 @@ def __getitem__(self, idx):
sample.append(self.y[idx].to(torch.long))
return sample
+
+ def _fetch_data_from_file(self, idx):
+ """Fetch data with the lazy-loading strategy, i.e. only loading data from the file while requesting for samples.
+ Here the opened file handle doesn't load the entire dataset into RAM but only load the currently accessed slice.
+
+ Parameters
+ ----------
+ idx : int,
+ The index of the sample to be return.
+
+ Returns
+ -------
+ sample : list,
+ The collated data sample, a list including all necessary sample info.
+ """
+
+ if self.file_handle is None:
+ self.file_handle = self._open_file_handle()
+
+ X = torch.from_numpy(self.file_handle["X"][idx])
+ X_intact, X, missing_mask, indicating_mask = mcar(X, rate=self.rate)
+
+ sample = [
+ torch.tensor(idx),
+ X_intact.to(torch.float32),
+ X.to(torch.float32),
+ missing_mask.to(torch.float32),
+ indicating_mask.to(torch.float32),
+ ]
+
+ if (
+ "y" in self.file_handle.keys()
+ ): # if the dataset has labels, then fetch it from the file
+ sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long))
+
+ return sample
diff --git a/pypots/forecasting/base.py b/pypots/forecasting/base.py
index 282b0336..5423657c 100644
--- a/pypots/forecasting/base.py
+++ b/pypots/forecasting/base.py
@@ -22,13 +22,29 @@ def __init__(self, device):
super().__init__(device)
@abstractmethod
- def fit(self, train_X):
- """Train the cluster.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the classifier on the given data.
Parameters
----------
- train_X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data for training, can contain missing values.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validation, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
@@ -38,7 +54,7 @@ def fit(self, train_X):
return self
@abstractmethod
- def forecast(self, X):
+ def forecast(self, X, file_type="h5py"):
"""Forecast the future the input with the trained model.
Parameters
@@ -46,6 +62,9 @@ def forecast(self, X):
X : array-like of shape [n_samples, sequence length (time steps), n_features],
Time-series data containing missing values.
+ file_type : str, default = "h5py"
+ The type of the given file if X is a path string.
+
Returns
-------
array-like, shape [n_samples, prediction_horizon, n_features],
diff --git a/pypots/forecasting/bttf.py b/pypots/forecasting/bttf.py
index 03711d5f..4bcd1cf2 100644
--- a/pypots/forecasting/bttf.py
+++ b/pypots/forecasting/bttf.py
@@ -458,11 +458,31 @@ def __init__(
self.burn_iter = burn_iter
self.gibbs_iter = gibbs_iter
- def fit(self, train_X):
+ def fit(self, train_set, val_set=None, file_type="h5py"):
warnings.warn("Please run func forecast(X) directly.")
- def forecast(self, X):
- self.check_input(self.n_steps, self.n_features, X, out_dtype="ndarray")
+ def forecast(self, X, file_type="h5py"):
+ """Forecast the future the input with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py"
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples, prediction_horizon, n_features],
+ Forecasting results.
+ """
+ assert not isinstance(
+ X, str
+ ), "BTTF so far does not accept file input. It needs a specified Dataset class."
+
+ X = X["X"]
X = X.transpose((0, 2, 1))
pred = BTTF_forecast(
diff --git a/pypots/imputation/base.py b/pypots/imputation/base.py
index e62ae50c..dbb70d9c 100644
--- a/pypots/imputation/base.py
+++ b/pypots/imputation/base.py
@@ -28,31 +28,49 @@ def __init__(self, device):
super().__init__(device)
@abstractmethod
- def fit(self, train_X, val_X=None):
- """Train the imputer.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the imputer on the given data.
Parameters
----------
- train_X : array-like, shape: [n_samples, sequence length (time steps), n_features],
- Time-series data for training, can contain missing values.
- val_X : array-like, optional, shape [n_samples, sequence length (time steps), n_features],
- Time-series data for validating, can contain missing values.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
- Trained imputer.
+ The trained imputer.
"""
return self
@abstractmethod
- def impute(self, X):
- """Impute missing data with the trained model.
+ def impute(self, X, file_type="h5py"):
+ """Impute missing values in the given data with the trained model.
Parameters
----------
- X : array-like of shape [n_samples, sequence length (time steps), n_features],
- Time-series data for imputing contains missing values.
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
Returns
-------
@@ -164,11 +182,11 @@ def _train_model(
with torch.no_grad():
for idx, data in enumerate(val_loader):
inputs = self.assemble_input_for_validating(data)
- results = self.model.forward(inputs)
- imputation_collector.append(results["imputed_data"])
+ imputed_data, _ = self.model.impute(inputs)
+ imputation_collector.append(imputed_data)
imputation_collector = torch.cat(imputation_collector)
- imputation_collector = imputation_collector
+ imputation_collector = imputation_collector.numpy()
mean_val_loss = cal_mae(
imputation_collector, val_X_intact, val_indicating_mask
diff --git a/pypots/imputation/brits.py b/pypots/imputation/brits.py
index d15c8e33..19d8450d 100644
--- a/pypots/imputation/brits.py
+++ b/pypots/imputation/brits.py
@@ -6,6 +6,7 @@
# License: GPL-v3
import math
+import numpy as np
import torch
import torch.nn as nn
@@ -336,7 +337,7 @@ def impute(self, inputs):
imputed_data_b = {"imputed_data_b": imputed_data_b}
imputed_data_b = self.reverse(imputed_data_b)["imputed_data_b"]
imputed_data = (imputed_data_f + imputed_data_b) / 2
- return imputed_data
+ return imputed_data, None
@staticmethod
def get_consistency_loss(pred_f, pred_b):
@@ -495,40 +496,58 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, val_X=None):
- """Fit the model on the given training data.
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the imputer on the given data.
Parameters
----------
- train_X : array-like, shape of [n_samples, n_steps, n_features],
- Data for training.
-
- val_X : array-like, optional, shape of [n_samples, n_steps, n_features],
- Data for validating.
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
- Trained model.
+ The trained imputer.
"""
- train_X = self.check_input(self.n_steps, self.n_features, train_X)
- if val_X is not None:
- val_X = self.check_input(self.n_steps, self.n_features, val_X)
-
- training_set = DatasetForBRITS(train_X) # time_gaps is necessary for BRITS
+ training_set = DatasetForBRITS(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
+ if isinstance(val_set, str):
+ import h5py
+
+ with h5py.File(val_set, "r") as hf:
+ val_X = hf["X"]
+ val_set = {"X": val_X}
+
val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar(
- val_X, 0.2
+ val_set["X"], 0.2
)
- val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan)
- val_set = DatasetForBRITS(val_X)
+ val_X = masked_fill(val_X, 1 - val_X_missing_mask, np.nan)
+ val_set["X"] = val_X
+ val_set = DatasetForBRITS(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
+
self._train_model(
training_loader, val_loader, val_X_intact, val_X_indicating_mask
)
@@ -610,8 +629,23 @@ def assemble_input_for_testing(self, data) -> dict:
"""
return self.assemble_input_for_training(data)
- def impute(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def impute(self, X, file_type="h5py"):
+ """Impute missing values in the given data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples, sequence length (time steps), n_features],
+ Imputed data.
+ """
self.model.eval() # set the model as eval status to freeze it.
test_set = DatasetForBRITS(X)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
@@ -620,7 +654,7 @@ def impute(self, X):
with torch.no_grad():
for idx, data in enumerate(test_loader):
inputs = self.assemble_input_for_testing(data)
- imputed_data = self.model.impute(inputs)
+ imputed_data, _ = self.model.impute(inputs)
imputation_collector.append(imputed_data)
imputation_collector = torch.cat(imputation_collector)
diff --git a/pypots/imputation/locf.py b/pypots/imputation/locf.py
index 2d391bb9..9bdde882 100644
--- a/pypots/imputation/locf.py
+++ b/pypots/imputation/locf.py
@@ -26,7 +26,35 @@ def __init__(self, nan=0):
super().__init__("cpu")
self.nan = nan
- def fit(self, train_X, val_X=None):
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the imputer on the given data.
+
+ Parameters
+ ----------
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
+
+ Returns
+ -------
+ self : object,
+ The trained imputer.
+ """
warnings.warn(
"LOCF (Last Observed Carried Forward) imputation class has no parameter to train. "
"Please run func impute(X) directly."
@@ -103,19 +131,27 @@ def locf_torch(self, X):
return X_imputed
- def impute(self, X):
- """Impute missing values
+ def impute(self, X, file_type="h5py"):
+ """Impute missing values in the given data with the trained model.
Parameters
----------
- X : array-like,
- Time-series vectors containing missing values (NaN).
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
Returns
-------
- array-like,
- Imputed time series.
+ array-like, shape [n_samples, sequence length (time steps), n_features],
+ Imputed data.
"""
+
+ assert not isinstance(X, str)
+ X = X["X"]
+
assert len(X.shape) == 3, (
f"Input X should have 3 dimensions [n_samples, n_steps, n_features], "
f"but the actual shape of X: {X.shape}"
diff --git a/pypots/imputation/saits.py b/pypots/imputation/saits.py
index d32bd0ab..3870b218 100644
--- a/pypots/imputation/saits.py
+++ b/pypots/imputation/saits.py
@@ -6,6 +6,7 @@
# Created by Wenjie Du
# License: GPL-v3
+import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
@@ -214,23 +215,55 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, val_X=None):
- train_X = self.check_input(self.n_steps, self.n_features, train_X)
- if val_X is not None:
- val_X = self.check_input(self.n_steps, self.n_features, val_X)
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the imputer on the given data.
- training_set = DatasetForMIT(train_X)
+ Parameters
+ ----------
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
+
+ Returns
+ -------
+ self : object,
+ The trained imputer.
+ """
+ training_set = DatasetForMIT(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
+ if isinstance(val_set, str):
+ import h5py
+
+ with h5py.File(val_set, "r") as hf:
+ val_X = hf["X"][:]
+ val_set = {"X": val_X}
+
val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar(
- val_X, 0.2
+ val_set["X"], 0.2
)
- val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan)
- val_set = DatasetForMIT(val_X)
+ val_X = masked_fill(val_X, 1 - val_X_missing_mask, np.nan)
+ val_set["X"] = val_X
+ val_set = BaseDataset(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
self._train_model(
training_loader, val_loader, val_X_intact, val_X_indicating_mask
@@ -282,7 +315,13 @@ def assemble_input_for_validating(self, data) -> dict:
inputs : dict,
A python dictionary contains the input data for model validating.
"""
- return self.assemble_input_for_training(data)
+ indices, X, missing_mask = data
+
+ inputs = {
+ "X": X,
+ "missing_mask": missing_mask,
+ }
+ return inputs
def assemble_input_for_testing(self, data) -> dict:
"""Assemble the given data into a dictionary for testing input.
@@ -301,12 +340,27 @@ def assemble_input_for_testing(self, data) -> dict:
inputs : dict,
A python dictionary contains the input data for model testing.
"""
- return self.assemble_input_for_training(data)
+ return self.assemble_input_for_validating(data)
- def impute(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ def impute(self, X, file_type="h5py"):
+ """Impute missing values in the given data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
+
+ Returns
+ -------
+ array-like, shape [n_samples, sequence length (time steps), n_features],
+ Imputed data.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = BaseDataset(X)
+ test_set = BaseDataset(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
imputation_collector = []
diff --git a/pypots/imputation/transformer.py b/pypots/imputation/transformer.py
index c84c30b1..4b89a94c 100644
--- a/pypots/imputation/transformer.py
+++ b/pypots/imputation/transformer.py
@@ -304,23 +304,56 @@ def __init__(
self.model = self.model.to(self.device)
self._print_model_size()
- def fit(self, train_X, val_X=None):
- train_X = self.check_input(self.n_steps, self.n_features, train_X)
- if val_X is not None:
- val_X = self.check_input(self.n_steps, self.n_features, val_X)
+ def fit(self, train_set, val_set=None, file_type="h5py"):
+ """Train the imputer on the given data.
- training_set = DatasetForMIT(train_X)
+ Parameters
+ ----------
+ train_set : dict or str,
+ The dataset for model training, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for training, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ val_set : dict or str,
+ The dataset for model validating, should be a dictionary including the key 'X',
+ or a path string locating a data file.
+ If it is a dict, X should be array-like of shape [n_samples, sequence length (time steps), n_features],
+ which is time-series data for validating, can contain missing values.
+ If it is a path string, the path should point to a data file, e.g. a h5 file, which contains
+ key-value pairs like a dict, and it has to include the key 'X'.
+
+ file_type : str, default = "h5py",
+ The type of the given file if train_set and val_set are path strings.
+
+ Returns
+ -------
+ self : object,
+ The trained imputer.
+ """
+
+ training_set = DatasetForMIT(train_set, file_type)
training_loader = DataLoader(
training_set, batch_size=self.batch_size, shuffle=True
)
- if val_X is None:
+ if val_set is None:
self._train_model(training_loader)
else:
+ if isinstance(val_set, str):
+ import h5py
+
+ with h5py.File(val_set, "r") as hf:
+ val_X = hf["X"]
+ val_set = {"X": val_X}
+
val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar(
- val_X, 0.2
+ val_set["X"], 0.2
)
val_X = masked_fill(val_X, 1 - val_X_missing_mask, np.nan)
- val_set = DatasetForMIT(val_X)
+ val_set["X"] = val_X
+ val_set = BaseDataset(val_set)
val_loader = DataLoader(val_set, batch_size=self.batch_size, shuffle=False)
self._train_model(
training_loader, val_loader, val_X_intact, val_X_indicating_mask
@@ -373,7 +406,14 @@ def assemble_input_for_validating(self, data) -> dict:
inputs : dict,
A python dictionary contains the input data for model validating.
"""
- return self.assemble_input_for_training(data)
+ indices, X, missing_mask = data
+
+ inputs = {
+ "X": X,
+ "missing_mask": missing_mask,
+ }
+
+ return inputs
def assemble_input_for_testing(self, data) -> dict:
"""Assemble the given data into a dictionary for testing input.
@@ -392,12 +432,27 @@ def assemble_input_for_testing(self, data) -> dict:
inputs : dict,
A python dictionary contains the input data for model testing.
"""
- return self.assemble_input_for_training(data)
+ return self.assemble_input_for_validating(data)
+
+ def impute(self, X, file_type="h5py"):
+ """Impute missing values in the given data with the trained model.
+
+ Parameters
+ ----------
+ X : array-like or str,
+ The data samples for testing, should be array-like of shape [n_samples, sequence length (time steps),
+ n_features], or a path string locating a data file, e.g. h5 file.
+
+ file_type : str, default = "h5py",
+ The type of the given file if X is a path string.
- def impute(self, X):
- X = self.check_input(self.n_steps, self.n_features, X)
+ Returns
+ -------
+ array-like, shape [n_samples, sequence length (time steps), n_features],
+ Imputed data.
+ """
self.model.eval() # set the model as eval status to freeze it.
- test_set = BaseDataset(X)
+ test_set = BaseDataset(X, file_type)
test_loader = DataLoader(test_set, batch_size=self.batch_size, shuffle=False)
imputation_collector = []
diff --git a/pypots/tests/environment_test.yml b/pypots/tests/environment_test.yml
index 44c3a21c..ceadbe60 100644
--- a/pypots/tests/environment_test.yml
+++ b/pypots/tests/environment_test.yml
@@ -10,14 +10,13 @@ dependencies:
- conda-forge::scipy
- conda-forge::pandas
- conda-forge::scikit-learn
- - conda-forge::matplotlib
+ - conda-forge::h5py
- conda-forge::tensorboard
- conda-forge::pip
- conda-forge::pytest-cov
- - conda-forge::pytest-xdist
+ - conda-forge::pytest-xdist>=2.5.0
- conda-forge::coverage
+ - conda-forge::pycorruptor
+ - conda-forge::tsdb
- pytorch::pytorch==1.11.0
- pyg::pyg==2.0.4
- - pip:
- - pycorruptor==0.0.4
- - tsdb==0.0.7
diff --git a/pypots/tests/test_classification.py b/pypots/tests/test_classification.py
index 8148980b..36f48484 100644
--- a/pypots/tests/test_classification.py
+++ b/pypots/tests/test_classification.py
@@ -7,33 +7,51 @@
import unittest
+import pytest
+
from pypots.classification import BRITS, GRUD, Raindrop
from pypots.tests.unified_data_for_test import DATA
-from pypots.utils.metrics import cal_binary_classification_metrics
from pypots.utils.logging import logger
+from pypots.utils.metrics import cal_binary_classification_metrics
EPOCHS = 5
+TRAIN_SET = {"X": DATA["train_X"], "y": DATA["train_y"]}
+VAL_SET = {"X": DATA["val_X"], "y": DATA["val_y"]}
+TEST_SET = {"X": DATA["test_X"]}
+
class TestBRITS(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.train_y = DATA["train_y"]
- self.val_X = DATA["val_X"]
- self.val_y = DATA["val_y"]
- self.test_X = DATA["test_X"]
- self.test_y = DATA["test_y"]
- logger.info("Running test cases for BRITS...")
- self.brits = BRITS(
- DATA["n_steps"],
- DATA["n_features"],
- 256,
- n_classes=DATA["n_classes"],
- epochs=EPOCHS,
+ logger.info("Running tests for a classification model BRITS...")
+
+ # initialize a BRITS model
+ brits = BRITS(
+ DATA["n_steps"],
+ DATA["n_features"],
+ 256,
+ n_classes=DATA["n_classes"],
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="classification-brits")
+ def test_0_fit(self):
+ self.brits.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="classification-brits")
+ def test_1_classify(self):
+ predictions = self.brits.classify(TEST_SET)
+ metrics = cal_binary_classification_metrics(predictions, DATA["test_y"])
+ logger.info(
+ f'ROC_AUC: {metrics["roc_auc"]}, \n'
+ f'PR_AUC: {metrics["pr_auc"]},\n'
+ f'F1: {metrics["f1"]},\n'
+ f'Precision: {metrics["precision"]},\n'
+ f'Recall: {metrics["recall"]},\n'
)
- self.brits.fit(self.train_X, self.train_y, self.val_X, self.val_y)
+ assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5"
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="classification-brits")
+ def test_2_parameters(self):
assert hasattr(self.brits, "model") and self.brits.model is not None
assert hasattr(self.brits, "optimizer") and self.brits.optimizer is not None
@@ -46,9 +64,27 @@ def test_parameters(self):
and self.brits.best_model_dict is not None
)
- def test_classify(self):
- predictions = self.brits.classify(self.test_X)
- metrics = cal_binary_classification_metrics(predictions, self.test_y)
+
+class TestGRUD(unittest.TestCase):
+ logger.info("Running tests for a classification model GRUD...")
+
+ # initialize a GRUD model
+ grud = GRUD(
+ DATA["n_steps"],
+ DATA["n_features"],
+ 256,
+ n_classes=DATA["n_classes"],
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="classification-grud")
+ def test_0_fit(self):
+ self.grud.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="classification-grud")
+ def test_1_classify(self):
+ predictions = self.grud.classify(TEST_SET)
+ metrics = cal_binary_classification_metrics(predictions, DATA["test_y"])
logger.info(
f'ROC_AUC: {metrics["roc_auc"]}, \n'
f'PR_AUC: {metrics["pr_auc"]},\n'
@@ -58,26 +94,8 @@ def test_classify(self):
)
assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5"
-
-class TestGRUD(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.train_y = DATA["train_y"]
- self.val_X = DATA["val_X"]
- self.val_y = DATA["val_y"]
- self.test_X = DATA["test_X"]
- self.test_y = DATA["test_y"]
- logger.info("Running test cases for GRUD...")
- self.grud = GRUD(
- DATA["n_steps"],
- DATA["n_features"],
- 256,
- n_classes=DATA["n_classes"],
- epochs=EPOCHS,
- )
- self.grud.fit(self.train_X, self.train_y, self.val_X, self.val_y)
-
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="classification-grud")
+ def test_2_parameters(self):
assert hasattr(self.grud, "model") and self.grud.model is not None
assert hasattr(self.grud, "optimizer") and self.grud.optimizer is not None
@@ -90,9 +108,35 @@ def test_parameters(self):
and self.grud.best_model_dict is not None
)
- def test_classify(self):
- predictions = self.grud.classify(self.test_X)
- metrics = cal_binary_classification_metrics(predictions, self.test_y)
+
+class TestRaindrop(unittest.TestCase):
+ logger.info("Running tests for a classification model Raindrop...")
+
+ # initialize a Raindrop model
+ raindrop = Raindrop(
+ DATA["n_features"],
+ 2,
+ DATA["n_features"] * 4,
+ 256,
+ 2,
+ DATA["n_classes"],
+ 0.3,
+ DATA["n_steps"],
+ 0,
+ "mean",
+ False,
+ False,
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="classification-raindrop")
+ def test_0_fit(self):
+ self.raindrop.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="classification-raindrop")
+ def test_1_classify(self):
+ predictions = self.raindrop.classify(TEST_SET)
+ metrics = cal_binary_classification_metrics(predictions, DATA["test_y"])
logger.info(
f'ROC_AUC: {metrics["roc_auc"]}, \n'
f'PR_AUC: {metrics["pr_auc"]},\n'
@@ -102,34 +146,8 @@ def test_classify(self):
)
assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5"
-
-class TestRaindrop(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.train_y = DATA["train_y"]
- self.val_X = DATA["val_X"]
- self.val_y = DATA["val_y"]
- self.test_X = DATA["test_X"]
- self.test_y = DATA["test_y"]
- logger.info("Running test cases for Raindrop...")
- self.raindrop = Raindrop(
- DATA["n_features"],
- 2,
- DATA["n_features"] * 4,
- 256,
- 2,
- DATA["n_classes"],
- 0.3,
- DATA["n_steps"],
- 0,
- "mean",
- False,
- False,
- epochs=EPOCHS,
- )
- self.raindrop.fit(self.train_X, self.train_y, self.val_X, self.val_y)
-
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="classification-raindrop")
+ def test_2_parameters(self):
assert hasattr(self.raindrop, "model") and self.raindrop.model is not None
assert (
@@ -144,18 +162,6 @@ def test_parameters(self):
and self.raindrop.best_model_dict is not None
)
- def test_classify(self):
- predictions = self.raindrop.classify(self.test_X)
- metrics = cal_binary_classification_metrics(predictions, self.test_y)
- logger.info(
- f'ROC_AUC: {metrics["roc_auc"]}, \n'
- f'PR_AUC: {metrics["pr_auc"]},\n'
- f'F1: {metrics["f1"]},\n'
- f'Precision: {metrics["precision"]},\n'
- f'Recall: {metrics["recall"]},\n'
- )
- assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5"
-
if __name__ == "__main__":
unittest.main()
diff --git a/pypots/tests/test_clustering.py b/pypots/tests/test_clustering.py
index ce22c64a..15b00736 100644
--- a/pypots/tests/test_clustering.py
+++ b/pypots/tests/test_clustering.py
@@ -9,31 +9,39 @@
import unittest
import numpy as np
+import pytest
from pypots.clustering import VaDER, CRLI
-from pypots.utils.logging import logger
from pypots.tests.unified_data_for_test import DATA
+from pypots.utils.logging import logger
from pypots.utils.metrics import cal_rand_index, cal_cluster_purity
EPOCHS = 5
+TRAIN_SET = {"X": DATA["train_X"]}
+VAL_SET = {"X": DATA["val_X"]}
+TEST_SET = {"X": DATA["test_X"]}
-class TestCRLI(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.train_y = DATA["train_y"]
- logger.info("Running test cases for CRLI...")
- self.crli = CRLI(
- n_steps=DATA["n_steps"],
- n_features=DATA["n_features"],
- n_clusters=DATA["n_classes"],
- n_generator_layers=2,
- rnn_hidden_size=128,
- epochs=EPOCHS,
- )
- self.crli.fit(self.train_X)
- def test_parameters(self):
+class TestCRLI(unittest.TestCase):
+ logger.info("Running tests for a clustering model CRLI...")
+
+ # initialize a CRLI model
+ crli = CRLI(
+ n_steps=DATA["n_steps"],
+ n_features=DATA["n_features"],
+ n_clusters=DATA["n_classes"],
+ n_generator_layers=2,
+ rnn_hidden_size=128,
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="clustering-crli")
+ def test_0_fit(self):
+ self.crli.fit(TRAIN_SET)
+
+ @pytest.mark.xdist_group(name="clustering-crli")
+ def test_1_parameters(self):
assert hasattr(self.crli, "model") and self.crli.model is not None
assert hasattr(self.crli, "G_optimizer") and self.crli.G_optimizer is not None
@@ -47,30 +55,47 @@ def test_parameters(self):
and self.crli.best_model_dict is not None
)
- def test_cluster(self):
- clustering = self.crli.cluster(self.train_X)
- RI = cal_rand_index(clustering, self.train_y)
- CP = cal_cluster_purity(clustering, self.train_y)
+ @pytest.mark.xdist_group(name="clustering-crli")
+ def test_2_cluster(self):
+ clustering = self.crli.cluster(TEST_SET)
+ RI = cal_rand_index(clustering, DATA["test_y"])
+ CP = cal_cluster_purity(clustering, DATA["test_y"])
logger.info(f"RI: {RI}\nCP: {CP}")
class TestVaDER(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.train_y = DATA["train_y"]
- logger.info("Running test cases for VaDER...")
- self.vader = VaDER(
- n_steps=DATA["n_steps"],
- n_features=DATA["n_features"],
- n_clusters=DATA["n_classes"],
- rnn_hidden_size=64,
- d_mu_stddev=5,
- pretrain_epochs=20,
- epochs=EPOCHS,
- )
- self.vader.fit(self.train_X)
+ logger.info("Running tests for a clustering model Transformer...")
+
+ # initialize a VaDER model
+ vader = VaDER(
+ n_steps=DATA["n_steps"],
+ n_features=DATA["n_features"],
+ n_clusters=DATA["n_classes"],
+ rnn_hidden_size=64,
+ d_mu_stddev=5,
+ pretrain_epochs=20,
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="clustering-vader")
+ def test_0_fit(self):
+ self.vader.fit(TRAIN_SET)
+
+ @pytest.mark.xdist_group(name="clustering-vader")
+ def test_1_cluster(self):
+ try:
+ clustering = self.vader.cluster(TEST_SET)
+ RI = cal_rand_index(clustering, DATA["test_y"])
+ CP = cal_cluster_purity(clustering, DATA["test_y"])
+ logger.info(f"RI: {RI}\nCP: {CP}")
+ except np.linalg.LinAlgError as e:
+ logger.error(
+ f"{e}\n"
+ "Got singular matrix, please try to retrain the model to fix this"
+ )
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="clustering-vader")
+ def test_2_parameters(self):
assert hasattr(self.vader, "model") and self.vader.model is not None
assert hasattr(self.vader, "optimizer") and self.vader.optimizer is not None
@@ -83,18 +108,6 @@ def test_parameters(self):
and self.vader.best_model_dict is not None
)
- def test_cluster(self):
- try:
- clustering = self.vader.cluster(self.train_X)
- RI = cal_rand_index(clustering, self.train_y)
- CP = cal_cluster_purity(clustering, self.train_y)
- logger.info(f"RI: {RI}\nCP: {CP}")
- except np.linalg.LinAlgError as e:
- logger.info(
- f"{e}\n"
- "Got singular matrix, please try to retrain the model to fix this"
- )
-
if __name__ == "__main__":
unittest.main()
diff --git a/pypots/tests/test_data.py b/pypots/tests/test_data.py
new file mode 100644
index 00000000..bf2c238d
--- /dev/null
+++ b/pypots/tests/test_data.py
@@ -0,0 +1,109 @@
+"""
+Test cases for data classes with the lazy-loading strategy of reading from files.
+"""
+
+# Created by Wenjie Du
+# License: GLP-v3
+
+import os
+import unittest
+
+import h5py
+import pytest
+
+from pypots.classification import BRITS, GRUD
+from pypots.imputation import SAITS
+from pypots.tests.unified_data_for_test import DATA
+from pypots.utils.logging import logger
+
+TRAIN_SET = "./train_set.h5"
+VAL_SET = "./val_set.h5"
+TEST_SET = "./test_set.h5"
+
+IMPUTATION_TRAIN_SET = "./imputation_train_set.h5"
+IMPUTATION_VAL_SET = "./imputation_val_set.h5"
+
+
+def save_data_set_into_h5(data, path):
+ with h5py.File(path, "w") as hf:
+ for i in data.keys():
+ tp = int if i == "y" else "float32"
+ hf.create_dataset(i, data=data[i].astype(tp))
+
+
+EPOCHS = 1
+
+
+class TestLazyLoadingClasses(unittest.TestCase):
+ logger.info("Running tests for Dataset classes with lazy-loading strategy...")
+
+ # initialize a SAITS model for testing DatasetForMIT and BaseDataset
+ saits = SAITS(
+ DATA["n_steps"],
+ DATA["n_features"],
+ n_layers=2,
+ d_model=256,
+ d_inner=128,
+ n_head=4,
+ d_k=64,
+ d_v=64,
+ dropout=0.1,
+ epochs=EPOCHS,
+ )
+
+ # initialize a BRITS model for testing DatasetForBRITS
+ brits = BRITS(
+ DATA["n_steps"],
+ DATA["n_features"],
+ 256,
+ n_classes=DATA["n_classes"],
+ epochs=EPOCHS,
+ )
+
+ # initialize a GRUD model for testing DatasetForGRUD
+ grud = GRUD(
+ DATA["n_steps"],
+ DATA["n_features"],
+ 256,
+ n_classes=DATA["n_classes"],
+ epochs=EPOCHS,
+ )
+
+ @pytest.mark.xdist_group(name="data-lazy-loading")
+ def test_0_save_datasets_into_files(self):
+ save_data_set_into_h5(
+ {"X": DATA["train_X"], "y": DATA["train_y"].astype(int)}, TRAIN_SET
+ )
+ save_data_set_into_h5(
+ {"X": DATA["val_X"], "y": DATA["val_y"].astype(int)}, VAL_SET
+ )
+ save_data_set_into_h5({"X": DATA["train_X"]}, IMPUTATION_TRAIN_SET)
+ save_data_set_into_h5({"X": DATA["val_X"]}, IMPUTATION_VAL_SET)
+
+ save_data_set_into_h5(
+ {
+ "X": DATA["test_X"],
+ "X_intact": DATA["test_X_intact"],
+ "X_indicating_mask": DATA["test_X_indicating_mask"],
+ },
+ TEST_SET,
+ )
+
+ @pytest.mark.xdist_group(name="data-lazy-loading")
+ def test_1_DatasetForMIT_BaseDataset(self):
+ self.saits.fit(train_set=IMPUTATION_TRAIN_SET, val_set=IMPUTATION_VAL_SET)
+ _ = self.saits.impute(X=TEST_SET)
+
+ @pytest.mark.xdist_group(name="data-lazy-loading")
+ def test_2_DatasetForBRITS(self):
+ self.brits.fit(train_set=TRAIN_SET, val_set=VAL_SET)
+ _ = self.brits.classify(X=TEST_SET)
+
+ @pytest.mark.xdist_group(name="data-lazy-loading")
+ def test_3_DatasetForGRUD(self):
+ self.grud.fit(train_set=TRAIN_SET, val_set=VAL_SET)
+ _ = self.grud.classify(X=TEST_SET)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/pypots/tests/test_forecasting.py b/pypots/tests/test_forecasting.py
index 27734a68..7a6bed4d 100644
--- a/pypots/tests/test_forecasting.py
+++ b/pypots/tests/test_forecasting.py
@@ -8,6 +8,7 @@
import unittest
import numpy as np
+import pytest
from pypots.forecasting import BTTF
from pypots.tests.unified_data_for_test import gene_random_walk_data
@@ -15,29 +16,29 @@
from pypots.utils.metrics import cal_mae
EPOCHS = 5
+DATA = gene_random_walk_data(n_steps=120, n_features=10)
+TEST_SET = {"X": DATA["test_X"][:, :100]}
class TestBTTF(unittest.TestCase):
- def setUp(self) -> None:
- DATA = gene_random_walk_data(n_steps=120, n_features=10)
- self.test_X = DATA["test_X"]
- self.test_X_intact = DATA["test_X_intact"]
- self.test_X_for_input = self.test_X[:, :100]
- logger.info("Running test cases for BTTF...")
- self.bttf = BTTF(
- 100,
- 10,
- 20,
- 2,
- 10,
- np.asarray([1, 2, 3, 10, 10 + 1, 10 + 2, 20, 20 + 1, 20 + 2]),
- 5,
- 5,
- )
-
- def test_forecasting(self):
- predictions = self.bttf.forecast(self.test_X_for_input)
- mae = cal_mae(predictions, self.test_X_intact[:, 100:])
+ logger.info("Running tests for a forecasting model BTTF...")
+
+ # initialize a BTTF model
+ bttf = BTTF(
+ 100,
+ 10,
+ 20,
+ 2,
+ 10,
+ np.asarray([1, 2, 3, 10, 10 + 1, 10 + 2, 20, 20 + 1, 20 + 2]),
+ 5,
+ 5,
+ )
+
+ @pytest.mark.xdist_group(name="forecasting-bttf")
+ def test_0_forecasting(self):
+ predictions = self.bttf.forecast(TEST_SET)
+ mae = cal_mae(predictions, DATA["test_X_intact"][:, 100:])
logger.info(f"prediction MAE: {mae}")
diff --git a/pypots/tests/test_imputation.py b/pypots/tests/test_imputation.py
index 957a4d34..34d75153 100644
--- a/pypots/tests/test_imputation.py
+++ b/pypots/tests/test_imputation.py
@@ -9,6 +9,7 @@
import unittest
import numpy as np
+import pytest
from pypots.imputation import (
SAITS,
@@ -17,35 +18,50 @@
LOCF,
)
from pypots.tests.unified_data_for_test import DATA
-from pypots.utils.metrics import cal_mae
from pypots.utils.logging import logger
+from pypots.utils.metrics import cal_mae
EPOCH = 5
+TRAIN_SET = {"X": DATA["train_X"]}
+VAL_SET = {"X": DATA["val_X"]}
+TEST_SET = {"X": DATA["test_X"]}
+
class TestSAITS(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.val_X = DATA["val_X"]
- self.test_X = DATA["test_X"]
- self.test_X_intact = DATA["test_X_intact"]
- self.test_X_indicating_mask = DATA["test_X_indicating_mask"]
- logger.info("Running test cases for SAITS...")
- self.saits = SAITS(
- DATA["n_steps"],
- DATA["n_features"],
- n_layers=2,
- d_model=256,
- d_inner=128,
- n_head=4,
- d_k=64,
- d_v=64,
- dropout=0.1,
- epochs=EPOCH,
+ logger.info("Running tests for an imputation model SAITS...")
+
+ # initialize a SAITS model
+ saits = SAITS(
+ DATA["n_steps"],
+ DATA["n_features"],
+ n_layers=2,
+ d_model=256,
+ d_inner=128,
+ n_head=4,
+ d_k=64,
+ d_v=64,
+ dropout=0.1,
+ epochs=EPOCH,
+ )
+
+ @pytest.mark.xdist_group(name="imputation-saits")
+ def test_0_fit(self):
+ self.saits.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="imputation-saits")
+ def test_1_impute(self):
+ imputed_X = self.saits.impute(TEST_SET)
+ assert not np.isnan(
+ imputed_X
+ ).any(), "Output still has missing values after running impute()."
+ test_MAE = cal_mae(
+ imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"]
)
- self.saits.fit(self.train_X, self.val_X)
+ logger.info(f"SAITS test_MAE: {test_MAE}")
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="imputation-saits")
+ def test_2_parameters(self):
assert hasattr(self.saits, "model") and self.saits.model is not None
assert hasattr(self.saits, "optimizer") and self.saits.optimizer is not None
@@ -58,38 +74,41 @@ def test_parameters(self):
and self.saits.best_model_dict is not None
)
- def test_impute(self):
- imputed_X = self.saits.impute(self.test_X)
+
+class TestTransformer(unittest.TestCase):
+ logger.info("Running tests for an imputation model Transformer...")
+
+ # initialize a Transformer model
+ transformer = Transformer(
+ DATA["n_steps"],
+ DATA["n_features"],
+ n_layers=2,
+ d_model=256,
+ d_inner=128,
+ n_head=4,
+ d_k=64,
+ d_v=64,
+ dropout=0.1,
+ epochs=EPOCH,
+ )
+
+ @pytest.mark.xdist_group(name="imputation-transformer")
+ def test_0_fit(self):
+ self.transformer.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="imputation-transformer")
+ def test_1_impute(self):
+ imputed_X = self.transformer.impute(TEST_SET)
assert not np.isnan(
imputed_X
).any(), "Output still has missing values after running impute()."
- test_MAE = cal_mae(imputed_X, self.test_X_intact, self.test_X_indicating_mask)
- logger.info(f"SAITS test_MAE: {test_MAE}")
-
-
-class TestTransformer(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.val_X = DATA["val_X"]
- self.test_X = DATA["test_X"]
- self.test_X_intact = DATA["test_X_intact"]
- self.test_X_indicating_mask = DATA["test_X_indicating_mask"]
- logger.info("Running test cases for Transformer...")
- self.transformer = Transformer(
- DATA["n_steps"],
- DATA["n_features"],
- n_layers=2,
- d_model=256,
- d_inner=128,
- n_head=4,
- d_k=64,
- d_v=64,
- dropout=0.1,
- epochs=EPOCH,
+ test_MAE = cal_mae(
+ imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"]
)
- self.transformer.fit(self.train_X, self.val_X)
+ logger.info(f"Transformer test_MAE: {test_MAE}")
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="imputation-transformer")
+ def test_2_parameters(self):
assert hasattr(self.transformer, "model") and self.transformer.model is not None
assert (
@@ -105,27 +124,30 @@ def test_parameters(self):
and self.transformer.best_model_dict is not None
)
- def test_impute(self):
- imputed_X = self.transformer.impute(self.test_X)
+
+class TestBRITS(unittest.TestCase):
+ logger.info("Running tests for an imputation model BRITS...")
+
+ # initialize a BRITS model
+ brits = BRITS(DATA["n_steps"], DATA["n_features"], 256, epochs=EPOCH)
+
+ @pytest.mark.xdist_group(name="imputation-brits")
+ def test_0_fit(self):
+ self.brits.fit(TRAIN_SET, VAL_SET)
+
+ @pytest.mark.xdist_group(name="imputation-brits")
+ def test_1_impute(self):
+ imputed_X = self.brits.impute(TEST_SET)
assert not np.isnan(
imputed_X
).any(), "Output still has missing values after running impute()."
- test_MAE = cal_mae(imputed_X, self.test_X_intact, self.test_X_indicating_mask)
- logger.info(f"Transformer test_MAE: {test_MAE}")
-
+ test_MAE = cal_mae(
+ imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"]
+ )
+ logger.info(f"BRITS test_MAE: {test_MAE}")
-class TestBRITS(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.val_X = DATA["val_X"]
- self.test_X = DATA["test_X"]
- self.test_X_intact = DATA["test_X_intact"]
- self.test_X_indicating_mask = DATA["test_X_indicating_mask"]
- logger.info("Running test cases for BRITS...")
- self.brits = BRITS(DATA["n_steps"], DATA["n_features"], 256, epochs=EPOCH)
- self.brits.fit(self.train_X, self.val_X)
-
- def test_parameters(self):
+ @pytest.mark.xdist_group(name="imputation-brits")
+ def test_2_parameters(self):
assert hasattr(self.brits, "model") and self.brits.model is not None
assert hasattr(self.brits, "optimizer") and self.brits.optimizer is not None
@@ -138,38 +160,26 @@ def test_parameters(self):
and self.brits.best_model_dict is not None
)
- def test_impute(self):
- imputed_X = self.brits.impute(self.test_X)
- assert not np.isnan(
- imputed_X
- ).any(), "Output still has missing values after running impute()."
- test_MAE = cal_mae(imputed_X, self.test_X_intact, self.test_X_indicating_mask)
- logger.info(f"BRITS test_MAE: {test_MAE}")
-
class TestLOCF(unittest.TestCase):
- def setUp(self) -> None:
- self.train_X = DATA["train_X"]
- self.val_X = DATA["val_X"]
- self.test_X = DATA["test_X"]
- self.test_X_intact = DATA["test_X_intact"]
- self.test_X_indicating_mask = DATA["test_X_indicating_mask"]
- logger.info("Running test cases for LOCF...")
- self.locf = LOCF(nan=0)
-
- def test_parameters(self):
- assert hasattr(self.locf, "nan") and self.locf.nan is not None
+ logger.info("Running tests for an imputation model LOCF...")
+ locf = LOCF(nan=0)
- def test_impute(self):
- test_X_imputed = self.locf.impute(self.test_X)
+ @pytest.mark.xdist_group(name="imputation-locf")
+ def test_0_impute(self):
+ test_X_imputed = self.locf.impute(TEST_SET)
assert not np.isnan(
test_X_imputed
).any(), "Output still has missing values after running impute()."
test_MAE = cal_mae(
- test_X_imputed, self.test_X_intact, self.test_X_indicating_mask
+ test_X_imputed, DATA["test_X_intact"], DATA["test_X_indicating_mask"]
)
logger.info(f"LOCF test_MAE: {test_MAE}")
+ @pytest.mark.xdist_group(name="imputation-locf")
+ def test_1_parameters(self):
+ assert hasattr(self.locf, "nan") and self.locf.nan is not None
+
if __name__ == "__main__":
unittest.main()
diff --git a/pypots/tests/test_logging.py b/pypots/tests/test_logging.py
index 3ebc3fca..f3c888fe 100644
--- a/pypots/tests/test_logging.py
+++ b/pypots/tests/test_logging.py
@@ -13,30 +13,37 @@
class TestLogger(unittest.TestCase):
- def setUp(self) -> None:
- self.logger_creator = Logger(name="PyPOTS testing log", logging_level="debug")
- self.logger = self.logger_creator.logger
+ logger_creator = Logger(name="PyPOTS testing log", logging_level="debug")
+ logger = logger_creator.logger
def test_different_level_logging(self):
- self.logger.debug('debug')
- self.logger.info('info')
- self.logger.warning('warning')
- self.logger.error('error')
+ self.logger.debug("debug")
+ self.logger.info("info")
+ self.logger.warning("warning")
+ self.logger.error("error")
def test_changing_level(self):
- self.logger_creator.set_level('info')
- assert self.logger.level == 20, f'the level of logger is {self.logger.level}, not INFO'
- self.logger_creator.set_level('warning')
- assert self.logger.level == 30, f'the level of logger is {self.logger.level}, not WARNING'
- self.logger_creator.set_level('error')
- assert self.logger.level == 40, f'the level of logger is {self.logger.level}, not ERROR'
- self.logger_creator.set_level('debug')
- assert self.logger.level == 10, f'the level of logger is {self.logger.level}, not DEBUG'
+ self.logger_creator.set_level("info")
+ assert (
+ self.logger.level == 20
+ ), f"the level of logger is {self.logger.level}, not INFO"
+ self.logger_creator.set_level("warning")
+ assert (
+ self.logger.level == 30
+ ), f"the level of logger is {self.logger.level}, not WARNING"
+ self.logger_creator.set_level("error")
+ assert (
+ self.logger.level == 40
+ ), f"the level of logger is {self.logger.level}, not ERROR"
+ self.logger_creator.set_level("debug")
+ assert (
+ self.logger.level == 10
+ ), f"the level of logger is {self.logger.level}, not DEBUG"
def test_saving_log_into_file(self):
- self.logger_creator.set_saving_path('test_log', 'testing.log')
- assert os.path.exists('test_log/testing.log')
- shutil.rmtree('test_log', ignore_errors=True)
+ self.logger_creator.set_saving_path("test_log", "testing.log")
+ assert os.path.exists("test_log/testing.log")
+ shutil.rmtree("test_log", ignore_errors=True)
if __name__ == "__main__":
diff --git a/requirements.txt b/requirements.txt
index 59de6847..41a9e125 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,4 +5,5 @@ scipy
tensorboard
pandas
pycorruptor
-tsdb
\ No newline at end of file
+tsdb
+h5py
diff --git a/setup.py b/setup.py
index ba9febff..9cafa889 100644
--- a/setup.py
+++ b/setup.py
@@ -42,6 +42,7 @@
"pandas",
"pycorruptor",
"tsdb",
+ "h5py",
],
setup_requires=["setuptools>=38.6.0"],
)