Skip to content

Commit

Permalink
Merge pull request #42 from WenjieDu/dev
Browse files Browse the repository at this point in the history
Save training logs into tensorboard files, enable setting num_workers of DataLoader, and add typing annotations
  • Loading branch information
WenjieDu authored Apr 8, 2023
2 parents 4654961 + fc01480 commit 4646f5c
Show file tree
Hide file tree
Showing 28 changed files with 1,623 additions and 909 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macOS-latest]
python-version: [3.7, 3.9]
python-version: ["3.7", "3.9", "3.10"]

steps:
- uses: actions/checkout@v3
Expand Down
153 changes: 125 additions & 28 deletions pypots/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,80 +7,137 @@

import os
from abc import ABC
from typing import Optional, Union

import torch
from torch.utils.tensorboard import SummaryWriter

from pypots.utils.files import create_dir_if_not_exist
from pypots.utils.logging import logger


class BaseModel(ABC):
"""Base class for all models."""
"""Base model class for all model implementations.
def __init__(self, device):
self.logger = {}
Parameters
----------
device : str or `torch.device`, default = None,
The device for the model to run on.
If not given, will try to use CUDA devices first, then CPUs. CUDA and CPU are so far the main devices for people
to train ML models. Other devices like Google TPU and Apple Silicon accelerator MPS may be added in the future.
tb_file_saving_path : str, default = None,
The path to save the tensorboard file, which contains the loss values recorded during training.
"""

def __init__(
self,
device: Optional[Union[str, torch.device]] = None,
tb_file_saving_path: str = None,
):
self.model = None

# set up the device for model running below
if device is None:
# if it is None, then
self.device = torch.device(
"cuda:0"
if torch.cuda.is_available() and torch.cuda.device_count() > 0
else "cpu"
)
logger.info(f"No given device, using default device: {self.device}")
else:
self.device = device
if isinstance(device, str):
self.device = torch.device(device)
elif isinstance(device, torch.device):
self.device = device
else:
raise TypeError(
f"device should be str or torch.device, but got {type(device)}"
)

# set up the summary writer for training log saving below
if isinstance(tb_file_saving_path, str):

from datetime import datetime

# get the current time to append to the dir name,
# so you can use the same tb_file_saving_path for multiple running
time_now = datetime.now().__format__("%Y-%m-%d_T%H:%M:%S")
# the actual directory name to save the tensorboard file
actual_tb_saving_dir_name = "tensorboard_" + time_now
actual_tb_file_saving_path = os.path.join(
tb_file_saving_path, actual_tb_saving_dir_name
)
os.makedirs(actual_tb_saving_dir_name) # create the dir for file saving
self.summary_writer = SummaryWriter(actual_tb_file_saving_path)
else:
# don't save the log if tb_file_saving_path isn't given, set summary_writer as None
self.summary_writer = None

def save_logs_to_tensorboard(self, saving_path):
"""Save logs (self.logger) into a tensorboard file.
def save_into_tb_file(self, step: int, stage: str, loss_dict: dict) -> None:
"""Saving training logs into the tensorboard file.
Parameters
----------
saving_path : str
Local disk path to save the tensorboard file.
step : int,
The current training step number.
stage : str,
The stage of the current operation, 'training' or 'validating'.
loss_dict : dict,
A dictionary containing items to log, should have at least one item, e.g. {'imputation loss': 0.05}
"""
# TODO: find a solution for log saving
raise IOError("This function is not ready for users.")
# tb_summary_writer = SummaryWriter(saving_path)
# tb_summary_writer.add_custom_scalars(self.logger)
# tb_summary_writer.close()
# logger.info(f'Log saved successfully to {saving_path}.')

def save_model(self, saving_dir, name, overwrite=False):
while len(loss_dict) > 0:
(item_name, loss) = loss_dict.popitem()
self.summary_writer.add_scalar(f"{item_name}/{stage}", loss, step)

def save_model(
self,
saving_dir: str,
file_name: str,
overwrite: bool = False,
) -> None:
"""Save the model to a disk file.
A .pypots extension will be appended to the filename if it does not already have one.
Please note that such an extension is not necessary, but to indicate the saved model is from PyPOTS framework so people can distinguish.
Please note that such an extension is not necessary, but to indicate the saved model is from PyPOTS framework
so people can distinguish.
Parameters
----------
saving_dir : str,
The given directory to save the model.
name : str,
file_name : str,
The file name of the model to be saved.
overwrite : bool,
overwrite : bool, default = False,
Whether to overwrite the model file if the path already exists.
"""
name = name + ".pypots" if name.split(".")[-1] != "pypots" else name
saving_path = os.path.join(saving_dir, name)
file_name = (
file_name + ".pypots" if file_name.split(".")[-1] != "pypots" else file_name
)
saving_path = os.path.join(saving_dir, file_name)

if os.path.exists(saving_path):
if overwrite:
logger.warning(
f"File {saving_path} exists. Argument `overwrite` is True. Overwriting now..."
)
else:
logger.error(f"File {saving_path} exists. Saving operation aborted.")
return
try:
create_dir_if_not_exist(saving_dir)
torch.save(self.model, saving_path)
logger.info(f"Saved successfully to {saving_path}.")
except Exception as e:
raise RuntimeError(f'{e} Failed to save the model to "{saving_path}"!')

def load_model(self, model_path):
def load_model(self, model_path: str) -> None:
"""Load the saved model from a disk file.
Parameters
Expand All @@ -106,12 +163,51 @@ def load_model(self, model_path):


class BaseNNModel(BaseModel):
"""Abstract class for all neural-network models."""
"""Abstract class for all neural-network models.
Parameters
----------
batch_size : int,
Size of the batch input into the model for one step.
epochs : int,
Training epochs, i.e. the maximum rounds of the model to be trained with.
patience : int,
Number of epochs the training procedure will keep if loss doesn't decrease.
Once exceeding the number, the training will stop.
learning_rate : float,
The learning rate of the optimizer.
weight_decay : float,
The weight decay of the optimizer.
num_workers : int, default = 0,
The number of subprocesses to use for data loading.
`0` means data loading will be in the main process, i.e. there won't be subprocesses.
device : str or `torch.device`, default = None,
The device for the model to run on.
If not given, will try to use CUDA devices first, then CPUs. CUDA and CPU are so far the main devices for people
to train ML models. Other devices like Google TPU and Apple Silicon accelerator MPS may be added in the future.
tb_file_saving_path : str, default = None,
The path to save the tensorboard file, which contains the loss values recorded during training.
"""

def __init__(
self, learning_rate, epochs, patience, batch_size, weight_decay, device
self,
batch_size: int,
epochs: int,
patience: int,
learning_rate: float,
weight_decay: float,
num_workers: int = 0,
device: Optional[Union[str, torch.device]] = None,
tb_file_saving_path: str = None,
):
super().__init__(device)
super().__init__(device, tb_file_saving_path)

# training hype-parameters
self.batch_size = batch_size
Expand All @@ -120,16 +216,17 @@ def __init__(
self.original_patience = patience
self.lr = learning_rate
self.weight_decay = weight_decay
self.num_workers = num_workers

self.model = None
self.optimizer = None
self.best_model_dict = None
self.best_loss = float("inf")
self.logger = {"training_loss": [], "validating_loss": []}

def _print_model_size(self):
def _print_model_size(self) -> None:
"""Print the number of trainable parameters in the initialized NN model."""
num_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
logger.info(
f"Model initialized successfully. Number of the trainable parameters: {num_params}"
f"Model initialized successfully with the number of trainable parameters: {num_params}"
)
76 changes: 52 additions & 24 deletions pypots/classification/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@


from abc import abstractmethod
from typing import Optional, Union

import numpy as np
import torch
from torch.utils.data import DataLoader

from pypots.base import BaseModel, BaseNNModel
from pypots.utils.logging import logger
Expand All @@ -18,11 +20,23 @@
class BaseClassifier(BaseModel):
"""Abstract class for all classification models."""

def __init__(self, device):
super().__init__(device)
def __init__(
self,
device: Optional[Union[str, torch.device]] = None,
tb_file_saving_path: str = None,
):
super().__init__(
device,
tb_file_saving_path,
)

@abstractmethod
def fit(self, train_set, val_set=None, file_type="h5py"):
def fit(
self,
train_set: Union[dict, str],
val_set: Optional[Union[dict, str]] = None,
file_type: str = "h5py",
) -> None:
"""Train the classifier on the given data.
Parameters
Expand All @@ -48,15 +62,15 @@ def fit(self, train_set, val_set=None, file_type="h5py"):
file_type : str, default = "h5py",
The type of the given file if train_set and val_set are path strings.
Returns
-------
self : object,
Trained classifier.
"""
return self
pass

@abstractmethod
def classify(self, X, file_type="h5py"):
def classify(
self,
X: Union[dict, str],
file_type: str = "h5py",
) -> np.ndarray:
"""Classify the input data with the trained model.
Parameters
Expand All @@ -78,22 +92,31 @@ def classify(self, X, file_type="h5py"):

class BaseNNClassifier(BaseNNModel, BaseClassifier):
def __init__(
self,
n_classes,
learning_rate,
epochs,
patience,
batch_size,
weight_decay,
device,
self,
n_classes: int,
batch_size: int,
epochs: int,
patience: int,
learning_rate: float,
weight_decay: float,
num_workers: int = 0,
device: Optional[Union[str, torch.device]] = None,
tb_file_saving_path: str = None,
):
super().__init__(
learning_rate, epochs, patience, batch_size, weight_decay, device
batch_size,
epochs,
patience,
learning_rate,
weight_decay,
num_workers,
device,
tb_file_saving_path,
)
self.n_classes = n_classes

@abstractmethod
def assemble_input_for_training(self, data) -> dict:
def _assemble_input_for_training(self, data) -> dict:
"""Assemble the given data into a dictionary for training input.
Parameters
Expand All @@ -109,7 +132,7 @@ def assemble_input_for_training(self, data) -> dict:
pass

@abstractmethod
def assemble_input_for_validating(self, data) -> dict:
def _assemble_input_for_validating(self, data) -> dict:
"""Assemble the given data into a dictionary for validating input.
Parameters
Expand All @@ -125,7 +148,7 @@ def assemble_input_for_validating(self, data) -> dict:
pass

@abstractmethod
def assemble_input_for_testing(self, data) -> dict:
def _assemble_input_for_testing(self, data) -> dict:
"""Assemble the given data into a dictionary for testing input.
Notes
Expand All @@ -148,7 +171,12 @@ def assemble_input_for_testing(self, data) -> dict:
"""
pass

def _train_model(self, training_loader, val_loader=None):
def _train_model(
self,
training_loader: DataLoader,
val_loader: DataLoader = None,
) -> None:

self.optimizer = torch.optim.Adam(
self.model.parameters(), lr=self.lr, weight_decay=self.weight_decay
)
Expand All @@ -162,7 +190,7 @@ def _train_model(self, training_loader, val_loader=None):
self.model.train()
epoch_train_loss_collector = []
for idx, data in enumerate(training_loader):
inputs = self.assemble_input_for_training(data)
inputs = self._assemble_input_for_training(data)
self.optimizer.zero_grad()
results = self.model.forward(inputs)
results["loss"].backward()
Expand All @@ -179,7 +207,7 @@ def _train_model(self, training_loader, val_loader=None):
epoch_val_loss_collector = []
with torch.no_grad():
for idx, data in enumerate(val_loader):
inputs = self.assemble_input_for_validating(data)
inputs = self._assemble_input_for_validating(data)
results = self.model.forward(inputs)
epoch_val_loss_collector.append(results["loss"].item())

Expand Down
Loading

0 comments on commit 4646f5c

Please sign in to comment.