diff --git a/docs/api/minari_functions.md b/docs/api/minari_functions.md index 4cd30f86..53a31815 100644 --- a/docs/api/minari_functions.md +++ b/docs/api/minari_functions.md @@ -50,3 +50,9 @@ minari_dataset/episode_data ```{eval-rst} .. autofunction:: minari.combine_datasets ``` + +## Normalize Score + +```{eval-rst} +.. autofunction:: minari.get_normalized_score +``` \ No newline at end of file diff --git a/minari/__init__.py b/minari/__init__.py index 73840a32..cf8bdde3 100644 --- a/minari/__init__.py +++ b/minari/__init__.py @@ -11,6 +11,7 @@ combine_datasets, create_dataset_from_buffers, create_dataset_from_collector_env, + get_normalized_score, split_dataset, ) @@ -34,6 +35,7 @@ "create_dataset_from_buffers", "create_dataset_from_collector_env", "split_dataset", + "get_normalized_score", ] __version__ = "0.3.1" diff --git a/minari/data_collector/data_collector.py b/minari/data_collector/data_collector.py index 95252518..ed6cd3b2 100644 --- a/minari/data_collector/data_collector.py +++ b/minari/data_collector/data_collector.py @@ -176,6 +176,7 @@ def _add_to_episode_buffer( assert isinstance( episode_buffer[key], dict ), f"Element to be inserted is type 'dict', but buffer accepts type {type(episode_buffer[key])}" + episode_buffer[key] = self._add_to_episode_buffer( episode_buffer[key], value ) @@ -203,7 +204,6 @@ def step( terminated=terminated, truncated=truncated, ) - # Force step data dictionary to include keys corresponding to Gymnasium step returns: # actions, observations, rewards, terminations, truncations, and infos assert STEP_DATA_KEYS.issubset( @@ -230,7 +230,12 @@ def step( # or truncation. This may happen if the step_data_callback truncates or terminates the episode under # certain conditions. if self._new_episode and not self._reset_called: - self._buffer[-1]["observations"] = [self._previous_eps_final_obs] + if isinstance(self._previous_eps_final_obs, dict): + self._buffer[-1]["observations"] = self._add_to_episode_buffer( + {}, self._previous_eps_final_obs + ) + else: + self._buffer[-1]["observations"] = [self._previous_eps_final_obs] if self._record_infos: self._buffer[-1]["infos"] = self._add_to_episode_buffer( {}, self._previous_eps_final_info @@ -447,7 +452,9 @@ def clear_buffer(dictionary_buffer: EpisodeBuffer, episode_group: h5py.Group): # Clear in-memory buffers self._buffer.clear() - def save_to_disk(self, path: str, dataset_metadata: Optional[Dict] = None): + def save_to_disk( + self, path: str, dataset_metadata: Optional[Dict[str, Any]] = None + ): """Save all in-memory buffer data and move temporary HDF5 file to a permanent location in disk. Args: diff --git a/minari/dataset/minari_dataset.py b/minari/dataset/minari_dataset.py index 72923d9e..4e91a0a0 100644 --- a/minari/dataset/minari_dataset.py +++ b/minari/dataset/minari_dataset.py @@ -181,7 +181,7 @@ def episode_indices(self) -> np.ndarray: """Indices of the available episodes to sample within the Minari dataset.""" return self._episode_indices - def recover_environment(self): + def recover_environment(self) -> gym.Env: """Recover the Gymnasium environment used to create the dataset. Returns: diff --git a/minari/utils.py b/minari/utils.py index 373f93f6..7bf3abb7 100644 --- a/minari/utils.py +++ b/minari/utils.py @@ -1,13 +1,16 @@ from __future__ import annotations +import copy import os import warnings -from typing import Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union import gymnasium as gym import h5py import numpy as np +from gymnasium.core import ActType, ObsType from gymnasium.envs.registration import EnvSpec +from gymnasium.wrappers import RecordEpisodeStatistics from minari import DataCollectorV0 from minari.dataset.minari_dataset import MinariDataset @@ -16,6 +19,19 @@ from minari.storage.datasets_root_dir import get_dataset_path +class RandomPolicy: + """A random action selection policy to compute `ref_min_score`.""" + + def __init__(self, env: gym.Env): + self.action_space = env.action_space + self.action_space.seed(123) + self.observation_space = env.observation_space + + def __call__(self, observation: ObsType) -> ActType: + assert self.observation_space.contains(observation) + return self.action_space.sample() + + def combine_datasets( datasets_to_combine: List[MinariDataset], new_dataset_id: str, copy: bool = False ): @@ -163,6 +179,28 @@ def split_dataset( return out_datasets +def get_average_reference_score( + env: gym.Env, + policy: Callable[[ObsType], ActType], + num_episodes: int, +) -> float: + + env = RecordEpisodeStatistics(env, num_episodes) + episode_returns = [] + obs, _ = env.reset(seed=123) + for _ in range(num_episodes): + while True: + action = policy(obs) + obs, _, terminated, truncated, info = env.step(action) + if terminated or truncated: + episode_returns.append(info["episode"]["r"]) + obs, _ = env.reset() + break + + mean_ref_score = np.mean(episode_returns, dtype=np.float32) + return float(mean_ref_score) + + def create_dataset_from_buffers( dataset_id: str, env: gym.Env, @@ -173,6 +211,10 @@ def create_dataset_from_buffers( code_permalink: Optional[str] = None, action_space: Optional[gym.spaces.Space] = None, observation_space: Optional[gym.spaces.Space] = None, + ref_min_score: Optional[float] = None, + ref_max_score: Optional[float] = None, + expert_policy: Optional[Callable[[ObsType], ActType]] = None, + num_episodes_average_score: int = 100, ): """Create Minari dataset from a list of episode dictionary buffers. @@ -197,6 +239,12 @@ def create_dataset_from_buffers( author (Optional[str], optional): author that generated the dataset. Defaults to None. author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None. code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None. + ref_min_score (Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy. + Also note that this attribute will be added to the Minari dataset only if `ref_max_score` or `expert_policy` are assigned a valid value other than None. + ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in `MinariDataset.get_normalized_score()`. Default None. + expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`. + `ref_max_score` and `expert_policy` can't be passed at the same time. Default to None + num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100. Returns: MinariDataset @@ -223,6 +271,11 @@ def create_dataset_from_buffers( if action_space is None: action_space = env.action_space + if expert_policy is not None and ref_max_score is not None: + raise ValueError( + "Can't pass a value for `expert_policy` and `ref_max_score` at the same time." + ) + dataset_path = get_dataset_path(dataset_id) # Check if dataset already exists @@ -275,6 +328,22 @@ def create_dataset_from_buffers( file.attrs["action_space"] = action_space_str file.attrs["observation_space"] = observation_space_str + if expert_policy is not None or ref_max_score is not None: + env = copy.deepcopy(env) + if ref_min_score is None: + ref_min_score = get_average_reference_score( + env, RandomPolicy(env), num_episodes_average_score + ) + + if expert_policy is not None: + ref_max_score = get_average_reference_score( + env, expert_policy, num_episodes_average_score + ) + + file.attrs["ref_max_score"] = ref_max_score + file.attrs["ref_min_score"] = ref_min_score + file.attrs["num_episodes_average_score"] = num_episodes_average_score + return MinariDataset(data_path) else: raise ValueError( @@ -289,6 +358,10 @@ def create_dataset_from_collector_env( author: Optional[str] = None, author_email: Optional[str] = None, code_permalink: Optional[str] = None, + ref_min_score: Optional[float] = None, + ref_max_score: Optional[float] = None, + expert_policy: Optional[Callable[[ObsType], ActType]] = None, + num_episodes_average_score: int = 100, ): """Create a Minari dataset using the data collected from stepping with a Gymnasium environment wrapped with a `DataCollectorV0` Minari wrapper. @@ -304,6 +377,11 @@ def create_dataset_from_collector_env( author (Optional[str], optional): author that generated the dataset. Defaults to None. author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None. code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None. + ref_min_score( Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy. + ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in :meth:`minari.get_normalized_score`. Default None. + expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`. + `ref_max_score` and `expert_policy` can't be passed at the same time. Default to None + num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100. Returns: MinariDataset @@ -324,6 +402,10 @@ def create_dataset_from_collector_env( "`author_email` is set to None. For longevity purposes it is highly recommended to provide an author email, or some other obvious contact information.", UserWarning, ) + if expert_policy is not None and ref_max_score is not None: + raise ValueError( + "Can't pass a value for `expert_policy` and `ref_max_score` at the same time." + ) assert collector_env.datasets_path is not None dataset_path = os.path.join(collector_env.datasets_path, dataset_id) @@ -333,18 +415,76 @@ def create_dataset_from_collector_env( dataset_path = os.path.join(dataset_path, "data") os.makedirs(dataset_path) data_path = os.path.join(dataset_path, "main_data.hdf5") + dataset_metadata: Dict[str, Any] = { + "dataset_id": str(dataset_id), + "algorithm_name": str(algorithm_name), + "author": str(author), + "author_email": str(author_email), + "code_permalink": str(code_permalink), + } + + if expert_policy is not None or ref_max_score is not None: + env = copy.deepcopy(collector_env.env) + if ref_min_score is None: + ref_min_score = get_average_reference_score( + env, RandomPolicy(env), num_episodes_average_score + ) + + if expert_policy is not None: + ref_max_score = get_average_reference_score( + env, expert_policy, num_episodes_average_score + ) + dataset_metadata.update( + { + "ref_max_score": ref_max_score, + "ref_min_score": ref_min_score, + "num_episodes_average_score": num_episodes_average_score, + } + ) + collector_env.save_to_disk( data_path, - dataset_metadata={ - "dataset_id": str(dataset_id), - "algorithm_name": str(algorithm_name), - "author": str(author), - "author_email": str(author_email), - "code_permalink": str(code_permalink), - }, + dataset_metadata=dataset_metadata, ) return MinariDataset(data_path) else: raise ValueError( f"A Minari dataset with ID {dataset_id} already exists and it cannot be overridden. Please use a different dataset name or version." ) + + +def get_normalized_score( + dataset: MinariDataset, returns: Union[float, np.float32] +) -> Union[float, np.float32]: + r"""Normalize undiscounted return of an episode. + + This function was originally provided in the `D4RL repository `_. + The computed normalized episode return (normalized score) facilitates the comparison of algorithm performance across different tasks. The returned normalized score will be in a range between 0 and 1. + Where 0 corresponds to the minimum reference score calculated as the average of episode returns collected from a random policy in the environment, and 1 corresponds to a maximum reference score computed as + the average of episode returns from an hypothetical expert policy. These two values are stored as optional attributes in a MinariDataset as `ref_min_score` and `ref_max_score` respectively. + + The formula to normalize an episode return is: + + .. math:: normalize\_score = \frac{return - ref\_min\_score}{ref\_max\_score - ref\_min\_score} + + .. warning:: This utility function is under testing and will not be available in every Minari dataset. For now, only the datasets imported from D4RL will contain the `ref_min_score` and `ref_max_score` attributes. + + Args: + dataset (MinariDataset): the MinariDataset with respect to which normalize the score. Must contain the reference score attributes `ref_min_score` and `ref_max_score`. + returns (float | np.float32): a single value or array of episode undiscounted returns to normalize. + + Returns: + normalized_scores + """ + with h5py.File(dataset.spec.data_path, "r") as f: + ref_min_score = f.attrs.get("ref_min_score", default=None) + ref_max_score = f.attrs.get("ref_max_score", default=None) + if ref_min_score is None or ref_max_score is None: + raise ValueError( + f"Reference score not provided for dataset {dataset.spec.dataset_id}. Can't compute the normalized score." + ) + + assert isinstance(ref_min_score, float) + assert isinstance(ref_max_score, float) + + return (returns - ref_min_score) / (ref_max_score - ref_min_score) diff --git a/tests/common.py b/tests/common.py index e97e34c8..e4cdc1dc 100644 --- a/tests/common.py +++ b/tests/common.py @@ -57,7 +57,7 @@ def reset(self, seed=None, options=None): return self.observation_space.sample(), {} -class DummyTupleDisceteBoxEnv(gym.Env): +class DummyTupleDiscreteBoxEnv(gym.Env): def __init__(self): self.action_space = spaces.Tuple( ( @@ -235,8 +235,8 @@ def register_dummy_envs(): ) register( - id="DummyTupleDisceteBoxEnv-v0", - entry_point="tests.common:DummyTupleDisceteBoxEnv", + id="DummyTupleDiscreteBoxEnv-v0", + entry_point="tests.common:DummyTupleDiscreteBoxEnv", max_episode_steps=5, ) @@ -584,7 +584,7 @@ def check_episode_data_integrity( observation_space: gym.spaces.Space, action_space: gym.spaces.Space, ): - """Checks to see if a list of EpisodeData insteances has consistent data and that the observations and actions are in the appropriate spaces. + """Checks to see if a list of EpisodeData instances has consistent data and that the observations and actions are in the appropriate spaces. Args: episode_data_list (List[EpisodeData]): A list of EpisodeData instances representing episodes. diff --git a/tests/data_collector/test_data_collector.py b/tests/data_collector/test_data_collector.py new file mode 100644 index 00000000..63907a0f --- /dev/null +++ b/tests/data_collector/test_data_collector.py @@ -0,0 +1,146 @@ +import gymnasium as gym +import numpy as np +import pytest + +import minari +from minari import DataCollectorV0, EpisodeData, MinariDataset, StepDataCallback +from tests.common import check_load_and_delete_dataset, register_dummy_envs + + +register_dummy_envs() + + +class ForceTruncateStepDataCallback(StepDataCallback): + episode_steps = 10 + + def __init__(self) -> None: + super().__init__() + self.time_steps = 0 + + def __call__(self, env, **kwargs): + step_data = super().__call__(env, **kwargs) + + step_data["terminations"] = False + if self.time_steps % self.episode_steps == 0: + step_data["truncations"] = True + + self.time_steps += 1 + return step_data + + +def _get_step_from_dictionary_space(episode_data, step_index): + step_data = {} + assert isinstance(episode_data, dict) + for key, value in episode_data.items(): + if isinstance(value, dict): + step_data[key] = _get_step_from_dictionary_space(value, step_index) + elif isinstance(value, tuple): + step_data[key] = _get_step_from_tuple_space(value, step_index) + else: + step_data[key] = value[step_index] + return step_data + + +def _get_step_from_tuple_space(episode_data, step_index): + step_data = [] + assert isinstance(episode_data, tuple) + for element in episode_data: + if isinstance(element, dict): + step_data.append(_get_step_from_dictionary_space(element, step_index)) + elif isinstance(element, tuple): + step_data.append(_get_step_from_tuple_space(element, step_index)) + else: + step_data.append(element[step_index]) + + return tuple(step_data) + + +def get_single_step_from_episode(episode: EpisodeData, index: int) -> EpisodeData: + """Get a single step EpisodeData from a full episode.""" + if isinstance(episode.observations, dict): + observation = _get_step_from_dictionary_space(episode.observations, index) + elif isinstance(episode.observations, tuple): + observation = _get_step_from_tuple_space(episode.observations, index) + else: + observation = episode.observations[index] + if isinstance(episode.actions, dict): + action = _get_step_from_dictionary_space(episode.actions, index) + elif isinstance(episode.actions, tuple): + action = _get_step_from_tuple_space(episode.actions, index) + else: + action = episode.actions[index] + + step_data = { + "id": episode.id, + "total_timesteps": 1, + "seed": None, + "observations": observation, + "actions": action, + "rewards": episode.rewards[index], + "terminations": episode.terminations[index], + "truncations": episode.truncations[index], + } + + return EpisodeData(**step_data) + + +@pytest.mark.parametrize( + "dataset_id,env_id", + [ + ("dummy-dict-test-v0", "DummyDictEnv-v0"), + ("dummy-box-test-v0", "DummyBoxEnv-v0"), + ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), + ("dummy-combo-test-v0", "DummyComboEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), + ], +) +def test_truncation_without_reset(dataset_id, env_id): + """Test new episode creation when environment is truncated and env.reset is not called.""" + num_steps = 50 + num_episodes = int(num_steps / ForceTruncateStepDataCallback.episode_steps) + env = gym.make(env_id, max_episode_steps=50) + env = DataCollectorV0( + env, + step_data_callback=ForceTruncateStepDataCallback, + ) + + env.reset() + + for _ in range(num_steps): + env.step(env.action_space.sample()) + + dataset = minari.create_dataset_from_collector_env( + dataset_id=dataset_id, + collector_env=env, + algorithm_name="random_policy", + author="Farama", + author_email="farama@farama.org", + ) + + env.close() + + assert isinstance(dataset, MinariDataset) + assert dataset.total_episodes == num_episodes + assert dataset.spec.total_episodes == num_episodes + assert len(dataset.episode_indices) == num_episodes + + episodes_generator = dataset.iterate_episodes() + last_step = None + for episode in episodes_generator: + assert episode.total_timesteps == ForceTruncateStepDataCallback.episode_steps + if last_step is not None: + first_step = get_single_step_from_episode(episode, 0) + # Check that the last observation of the previous episode is carried over to the next episode + # as the reset observation. + if isinstance(first_step.observations, dict) or isinstance( + first_step.observations, tuple + ): + assert first_step.observations == last_step.observations + else: + assert np.array_equal(first_step.observations, last_step.observations) + last_step = get_single_step_from_episode(episode, -1) + print(last_step.truncations) + assert bool(last_step.truncations) is True + + # check load and delete local dataset + check_load_and_delete_dataset(dataset_id) diff --git a/tests/dataset/test_minari_dataset.py b/tests/dataset/test_minari_dataset.py index 5be3942f..5a2be869 100644 --- a/tests/dataset/test_minari_dataset.py +++ b/tests/dataset/test_minari_dataset.py @@ -63,7 +63,7 @@ def test_episode_data(space: gym.Space): ("dummy-box-test-v0", "DummyBoxEnv-v0"), ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_update_dataset_from_collector_env(dataset_id, env_id): @@ -119,7 +119,7 @@ def test_update_dataset_from_collector_env(dataset_id, env_id): ("dummy-box-test-v0", "DummyBoxEnv-v0"), ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_filter_episodes_and_subsequent_updates(dataset_id, env_id): @@ -301,7 +301,7 @@ def filter_by_index(episode: Any): ("dummy-box-test-v0", "DummyBoxEnv-v0"), ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_sample_episodes(dataset_id, env_id): @@ -344,7 +344,7 @@ def filter_by_index(episode: Any): ("dummy-box-test-v0", "DummyBoxEnv-v0"), ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_iterate_episodes(dataset_id, env_id): @@ -393,7 +393,7 @@ def test_iterate_episodes(dataset_id, env_id): ("dummy-box-test-v0", "DummyBoxEnv-v0"), ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_update_dataset_from_buffer(dataset_id, env_id): diff --git a/tests/utils/test_dataset_creation.py b/tests/utils/test_dataset_creation.py index 9dde0802..924ce12a 100644 --- a/tests/utils/test_dataset_creation.py +++ b/tests/utils/test_dataset_creation.py @@ -30,7 +30,7 @@ ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-text-test-v0", "DummyTextEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_generate_dataset_with_collector_env(dataset_id, env_id): @@ -95,7 +95,7 @@ def test_generate_dataset_with_collector_env(dataset_id, env_id): ("dummy-tuple-test-v0", "DummyTupleEnv-v0"), ("dummy-text-test-v0", "DummyTextEnv-v0"), ("dummy-combo-test-v0", "DummyComboEnv-v0"), - ("dummy-tuple-discrete-box-test-v0", "DummyTupleDisceteBoxEnv-v0"), + ("dummy-tuple-discrete-box-test-v0", "DummyTupleDiscreteBoxEnv-v0"), ], ) def test_generate_dataset_with_external_buffer(dataset_id, env_id): diff --git a/tests/utils/test_get_normalized_score.py b/tests/utils/test_get_normalized_score.py new file mode 100644 index 00000000..e69de29b