Create a reusable ML model loader #325
Replies: 7 comments 15 replies
-
It might be worth checking out how the transformers library handles those things. It makes all aspects around models (training, datasets, architectures, downloading, etc) very accessible.. https://huggingface.co/docs/transformers/pipeline_tutorial We could even use huggingface.co to store models and have them available for download with just one python call. |
Beta Was this translation helpful? Give feedback.
-
The above example would assume the following directory structure for the models
Usage of the above example code might look like this import pandas as pd
from model_handler import SingleModel, DayModel
# Create SingleModel instances for consumption and production
consumption_single_model_handler = SingleModel(model_dir="models/consumption/single_model")
production_single_model_handler = SingleModel(model_dir="models/production/single_model")
# Create DayModel instances for consumption and production
consumption_day_model_handler = DayModel(
model_dir_prefix="models/consumption/day_models/",
model_dir_suffix=""
)
production_day_model_handler = DayModel(
model_dir_prefix="models/production/day_models/",
model_dir_suffix=""
)
# Get model instances for a specific timestamp using SingleModel
timestamp = pd.Timestamp("2023-04-12 12:00:00")
consumption_single_model = consumption_single_model_handler.get_model(timestamp)
production_single_model = production_single_model_handler.get_model(timestamp)
# Get model instances for a specific timestamp using DayModel
consumption_day_model = consumption_day_model_handler.get_model(timestamp)
production_day_model = production_day_model_handler.get_model(timestamp)
# Use the obtained model instances for predictions or any other task
# consumption_single_model.predict(...)
# consumption_day_model.predict(...)
# production_single_model.predict(...)
# production_day_model.predict(...) |
Beta Was this translation helpful? Give feedback.
-
Here is an extension when having a model for each 15-min window of a given day: class IntervalModel(ModelHandler): # pylint: disable=too-few-public-methods
"""Store and return a separate model for each 15-minute interval."""
def __init__(
self,
model_dir_prefix: str,
model_dir_suffix: str,
) -> None:
"""Store and return separate models for each 15-minute interval.
Args:
model_dir_prefix (str): Prefix path to the directory with model definition.
model_dir_suffix (str): Suffix path to the directory with model definition.
Note:
model_dir_prefix + interval + model_dir_suffix should be the path to the
directory with the model definition.
"""
super().__init__()
self.model_paths = {
f"{i:02d}{j:02d}": os.path.join(model_dir_prefix + f"{i:02d}{j:02d}" + model_dir_suffix, "model.pkl")
for i in range(0, 24)
for j in range(0, 60, 15)
}
self._load_models()
for interval in self.model_paths.keys():
model_path = model_dir_prefix + interval + model_dir_suffix
self.start_model_monitor(model_path)
def _load_models(self):
for interval, interval_num in self.model_paths.items():
model_path = self.model_paths[interval]
with open(model_path, "rb") as f:
self._models[interval_num] = pickle.load(f)
def get_model(self, timestamp: pd.Timestamp) -> ModelType:
"""Return model for the 15-minute interval in the timestamp.
Args:
timestamp (pd.Timestamp): Timestamp for the model.
Returns:
ModelType: Model instance for the given timestamp.
"""
interval = timestamp.strftime("%H%M")
return self._models[interval] The directory structure could look like the following:
|
Beta Was this translation helpful? Give feedback.
-
Model handlerThe following is the current implementation of the model handler in the 24hr forecast actor. This model handler loads all existing models required for the 24hr forecast in a dictionary and uses them one by one to make forecasts for the next 24 hours. """ Model handler for daily forecast actor. """
from abc import ABC, abstractmethod
from typing import Dict
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
import os
import pickle
from xgboost.sklearn import XGBRegressor
class ModelUpdateHandler(FileSystemEventHandler):
"""Handler for model updates.
This class inherits from the FileSystemEventHandler class from the watchdog.events
module. The purpose of this class is to handle events related to changes in the
file system, specifically modifications or creations of files.
"""
def __init__(self, reload_model_func):
"""Create handler for model updates."""
self.reload_model_func = reload_model_func
def on_any_event(self, event):
"""Handle any event.
Args:
event (watchdog.events.FileSystemEvent): Event that occurred.
"""
if event.is_directory:
return
elif event.event_type == 'created':
print(f"Model file {event.src_path} has been created.")
elif event.event_type == 'modified':
print(f"Model file {event.src_path} has been modified. Reloading model...")
self.reload_model_func(event.src_path)
class ModelHandler(ABC):
"""Interface for storing models.
Abstract class defining an interface for storing and retrieving machine learning models.
"""
def __init__(self, *args: str) -> None:
"""Create models and stores internally.
Args:
args (str): all arguments needed to specify the model directories
kwargs (Dict[str, str]): all arguments needed to specify the model
directories
"""
self._observer = None
self._model = None
self.model_dir_prefix = args[0]
@abstractmethod
def get_model(self, i_ahead: int) -> XGBRegressor:
"""Return model for the given timestamp.
Args:
i_ahead (pd.Timestamp): 15min window ahead.
Returns:
XGBRegressor: Model instance.
"""
def start_model_monitor(self, model_dir: str):
"""Start monitoring the model directory for changes.
Args:
model_dir (str): Path to the directory with model definition.
Note:
model_dir + xgb_model + model_suffix should be path to the
directory with the model definition.
"""
print(f"Monitoring model directory {model_dir} for changes...")
event_handler = ModelUpdateHandler(self.reload_model)
self._observer = Observer()
self._observer.schedule(event_handler, os.path.dirname(model_dir), recursive=False)
self._observer.start()
def stop_model_monitor(self):
"""Stop monitoring the model directory for changes."""
if self._observer:
self._observer.stop()
self._observer.join()
def reload_model(self, model_path: str):
"""Reload the model when the model file is updated.
This function reloads the model by deserializing it from the updated file.
Once the model has been reloaded, a message is printed to the console
indicating that the model has been reloaded.
Args:
model_path (str): Path to the model file.
Note:
model_path should be the path to the model file, not the path to the
directory containing the model file.
"""
with open(model_path, 'rb') as f: #TODO: Check if pickle accepts extra metadata. No need for new file.
self._model = pickle.load(f)
print("Model reloaded.")
class SingleModel(ModelHandler):
"""Retrieve a single model, independently of time.
The class loads a set of pre-trained models during initialization and
stores them in a dictionary, where each key corresponds to the number
of 15-minute intervals ahead of the current time.
"""
def __init__(self, model_dir_prefix: str) -> None:
"""Store and return a model for a particular 15min window ahead.
Args:
model_dir_prefix (str): Prefix path to the directory with model definition.
Note:
model_dir_prefix + xgb_model + model_name_suffix should be path to the
saved model definition.
"""
super().__init__()
self._model_dir_prefix = model_dir_prefix
self._models: Dict[int, XGBRegressor] = self._load_models()
def _load_models(
self
) -> Dict[int, XGBRegressor]:
"""Load all models.
Note:
self._model_dir_prefix + xgb_model + model_name_suffix should be path to the
directory with the model definition.
Returns:
Dict[int, XGBRegressor]: Dictionary with models for each 15min window.
"""
models: Dict[int, XGBRegressor] = {}
for model_name_suffix in range(1, 97):
path = self._model_dir_prefix + 'xgb_model_' + str(model_name_suffix) + '.pkl'
with open(path, "rb") as pickle_file:
models[model_name_suffix] = pickle.load(pickle_file)
return models
def get_model(self, i_ahead: int) -> XGBRegressor:
"""Return a stored model.
The i_ahead is an integer representing the number of 15-minute intervals
ahead of the current time. The method returns the stored model
instance for the specified interval.
Args:
i_ahead (pd.Timestamp): 15min window ahead.
Returns:
XGBRegressor: Model instance.
"""
return self._models[i_ahead] The handler could be used/tested the following way (note that this will probably have to be run in an infinite loop to be able to constantly monitor the directory): def test_model_handler():
'''Test model handler.'''
_loaded_models_dir = '/dir/to/loaded/models/'
model_handler = SingleModel(_loaded_models_dir)
model_handler.start_model_monitor(_loaded_models_dir)
model = model_handler.get_model(1)
assert model is not None
assert model.__class__.__name__ == 'XGBRegressor'
print(model)
model_handler.stop_model_monitor() As discussed with @daniel-zullo-frequenz earlier, one way to load the models at the moment could be that we save the models into specific directories as follows:
Monitoring (with
|
Beta Was this translation helpful? Give feedback.
-
Also we should use FileWatcher which is a channel receiver that watches for file events (file added, modified and deleted) given a list of Paths to watch for changes. |
Beta Was this translation helpful? Give feedback.
-
To sum up this is the generic directories/paths structure for storing/loading models considering the use-cases in the discussion so far:
I've have implemented the single, day and interval models (draft) using ModelHandler"""Load, update, monitor and retrieve machine learning models."""
from __future__ import annotations
import asyncio
import os
import pickle
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TypeVar
from frequenz.channels.util import FileWatcher, Select
from frequenz.sdk._internal._asyncio import cancel_and_await
T = TypeVar("T")
class ModelHandler(ABC):
"""Interface for loading, updating, and monitoring machine learning models."""
def __init__(self) -> None:
"""Handle machine learning models."""
self._select: Select | None = None
self._monitoring_task: asyncio.Task[None] | None = None
@abstractmethod
def get_model(self, index: int | str | datetime) -> T:
"""Get the model for the given index.
Args:
index: the index to get the model.
Returns:
the model instance.
"""
@abstractmethod
def reload_model(self, model_path: str) -> None:
"""Reload the model when the model file is updated.
Args:
model_path: the model path to be reloaded.
"""
async def start_model_monitor(self, model_paths: list[str]):
"""Start monitoring the model paths for changes.
Args:
model_paths: the list of model paths to be monitored.
"""
print(f"Monitoring model paths for changes: {model_paths}")
file_watcher = FileWatcher(paths=model_paths)
self._select = Select(file_watcher=file_watcher)
while await self._select.ready():
if msg := self._select.file_watcher:
event = msg.inner
if event.type == FileWatcher.EventType.CREATE:
print(f"Model has been created: {str(event.path)}")
if event.type == FileWatcher.EventType.MODIFY:
print(
f"Model file {str(event.path)} has been modified. Reloading model..."
)
self.reload_model(str(event.path))
async def stop_model_monitor(self):
"""Stop monitoring the model paths for changes."""
if self._select:
await self._select.stop()
await cancel_and_await(self._monitoring_task)
async def join(self) -> None:
"""Await the monitoring task, and return when the task completes."""
if self._monitoring_task and self._monitoring_task.done() is False:
await self._monitoring_task
def _load_model(self, model_path: str) -> T:
"""Load the model file.
Args:
model_path: the model path to be loaded.
Returns:
the model instance.
"""
assert os.path.exists(
model_path
), f"The model path {model_path} does not exist."
with open(model_path, "rb") as file_obj:
return pickle.load(file_obj) And these are the models: SingleModelfrom __future__ import annotations
import asyncio
from datetime import datetime
from typing import TypeVar
from ._handler import ModelHandler
T = TypeVar("T")
class SingleModel(ModelHandler):
"""Load, store and retrieve a single model."""
def __init__(
self,
model_path: str,
) -> None:
"""Handle a single model.
Args:
model_path: the path to the model file.
"""
super().__init__()
self._model_path: str = model_path
self._model: T = self._load_model(model_path)
self._monitoring_task: asyncio.Task[None] = asyncio.create_task(
self.start_model_monitor([self._model_path])
)
def get_model(self, index: int | str | datetime = 0) -> T:
"""Get the model for the given index.
Args:
index: index to get the model (not used).
Returns:
the model instance.
"""
return self._model
def reload_model(self, model_path: str):
"""Reload the model when the model file is updated.
Args:
model_path: the model path to be reloaded.
"""
if self._model_path == model_path:
self._model = self._load_model(model_path) DayModelfrom __future__ import annotations
import asyncio
import calendar
import os
from datetime import datetime
from typing import TypeVar
from ._handler import ModelHandler
T = TypeVar("T")
class DayModel(ModelHandler):
"""Load, update, store and retrieve a separate model for each day period."""
weekday_to_number: dict[str, int] = {
day.lower(): num for num, day in enumerate(calendar.day_abbr)
}
def __init__(
self,
model_dir_prefix: str,
model_dir_suffix: str = "",
model_file_name: str = "model.pkl",
period_to_number: dict[str, int] = None,
) -> None:
"""Handle separate models for each time period.
Args:
model_dir_prefix: prefix path to the directory with the models definition.
model_dir_suffix: suffix path to the directory with the models definition.
model_file_name: the model file name.
period_to_number: custom mapping of day periods to numbers.
Note:
model_dir_prefix + time_period + model_dir_suffix + model_file_name
should be the path to the file with the model definition.
"""
super().__init__()
self._period_to_number = (
self.weekday_to_number if period_to_number is None else period_to_number
)
self._model_paths: dict[str, str] = {
os.path.join(
model_dir_prefix, period, model_dir_suffix, model_file_name
): period
for period in self._period_to_number.keys()
}
self._models: dict[int, T] = {}
for model_path, period in self._model_paths.items():
period_num = self._period_to_number[period]
self._models[period_num] = self._load_model(model_path)
self._monitoring_task: asyncio.Task[None] = asyncio.create_task(
self.start_model_monitor(self._model_paths.keys())
)
def get_model(self, index: int | str | datetime = 0) -> T:
"""Get the model for the given index.
Args:
index: index to get the model (not used).
Returns:
the model instance.
"""
if isinstance(index, int):
return self._models[index]
if isinstance(index, str):
assert index in self._period_to_number.keys()
return self.get_model(self._period_to_number[index])
return self.get_model(index.weekday())
def reload_model(self, model_path: str):
"""Reload the model when the model file is updated.
Args:
model_path: the model path to be reloaded.
"""
if model_path in self._model_paths.keys():
period = self._model_paths[model_path]
period_num = self._period_to_number[period]
self._models[period_num] = self._load_model(model_path) IntervalModelfrom __future__ import annotations
import asyncio
import os
from datetime import datetime, timedelta
from typing import TypeVar
from ._handler import ModelHandler
T = TypeVar("T")
class IntervalModel(ModelHandler):
"""Load, update, store and retrieve a separate model for each interval period."""
def __init__(
self,
model_dir_prefix: str,
model_dir_suffix: str = "",
model_file_name: str = "model.pkl",
interval: timedelta = timedelta(minutes=15),
) -> None:
"""Handle separate models for each time period.
Args:
model_dir_prefix: prefix path to the directory with the models definition.
model_dir_suffix: suffix path to the directory with the models definition.
model_file_name: the model file name.
interval: the interval period in which models are defined.
Note:
model_dir_prefix, period, model_dir_suffix, model_file_name
should be the path to the file with the model definition.
"""
zero = timedelta(0)
assert interval > zero and (timedelta(hours=1) % interval) == zero
super().__init__()
intervals_per_hour = timedelta(hours=1) // interval
self._interval_to_number: dict[str, int] = {
f"{h:02d}{m:02d}": h * intervals_per_hour + timedelta(minutes=m) // interval
for h in range(0, 24)
for m in range(0, 60, 60 // intervals_per_hour)
}
self._interval: timedelta = interval
self._model_paths: dict[str, str] = {
os.path.join(
model_dir_prefix, period, model_dir_suffix, model_file_name
): period
for period in self._interval_to_number.keys()
}
self._models: dict[int, T] = {}
for model_path, period in self._model_paths.items():
period_num = self._interval_to_number[period]
self._models[period_num] = self._load_model(model_path)
self._monitoring_task: asyncio.Task[None] = asyncio.create_task(
self.start_model_monitor(self._model_paths.keys())
)
def get_model(self, index: int | str | datetime = 0) -> T:
"""Get the model for the given index.
Args:
index: index to get the model (not used).
Returns:
the model instance.
"""
if isinstance(index, int):
return self._models[index]
if isinstance(index, str):
return self.get_model(self._interval_to_number[index])
minutes = (timedelta(minutes=index.minute) // self._interval) * (
self._interval // timedelta(seconds=60)
)
return self.get_model(f"{index.hour:02d}{minutes:02d}")
def reload_model(self, model_path: str):
"""Reload the model when the model file is updated.
Args:
model_path: the model path to be reloaded.
"""
if model_path in self._model_paths.keys():
period = self._model_paths[model_path]
period_num = self._interval_to_number[period]
self._models[period_num] = self._load_model(model_path) Usage examples: # Single
single_model = SingleModel(model_path="models/consumption/single/model.pkl")
model = single_model.get_model()
# Day
day_model = DayModel(model_dir_prefix="models/consumption/day")
model_mon = day_model.get_model(0)
model_tue = day_model.get_model("tue")
model_sun = day_model.get_model(datetime(2023, 5, 21))
# Interval
interval_model = IntervalModel("models/consumption/interval")
model_4 = interval_model.get_model(4)
model_0100 = interval_model.get_model("0100")
assert model_4 == model_0100
model_1745 = interval_model.get_model("1715")
model_1645 = interval_model.get_model(datetime(2023, 5, 21, 16, 45))
model_1648 = interval_model.get_model(datetime(2023, 5, 21, 16, 48))
assert model_1645 == model_1648 |
Beta Was this translation helpful? Give feedback.
-
We already have several WIP, so I'm closing this, we can discuss in the PRs. |
Beta Was this translation helpful? Give feedback.
-
We will soon need to load pre-trained models to be used for predictions, so it would be good if the SDK can provide some framework to do this.
Requirements
pickle
modulePossible base implementation
The below example model loader needs to be adjusted in a way that it is general enough for other forecasts with different loading mechanics.
You could invoke these classes like following but there are a few possibilities how to use it:
The day-to-number mapping, we can be passed as an optional argument with a default value. This way, the user can provide a custom mapping if needed.
The optional model_init_args argument can be passed additional arguments when initialising models. This allows for custom parameters when working with models that require more than just a directory path. @idlir-shkurti-frequenz not sure if thats needed as I recall that we might also dump the model (hyper) parameters to a file too.
Beta Was this translation helpful? Give feedback.
All reactions