Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get normalized scores #110

Merged
merged 11 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/api/minari_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ minari_dataset/episode_data
```{eval-rst}
.. autofunction:: minari.combine_datasets
```

## Normalize Score

```{eval-rst}
.. autofunction:: minari.get_normalized_score
```
2 changes: 2 additions & 0 deletions minari/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
combine_datasets,
create_dataset_from_buffers,
create_dataset_from_collector_env,
get_normalized_score,
split_dataset,
)

Expand All @@ -34,6 +35,7 @@
"create_dataset_from_buffers",
"create_dataset_from_collector_env",
"split_dataset",
"get_normalized_score",
]

__version__ = "0.3.1"
13 changes: 10 additions & 3 deletions minari/data_collector/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _add_to_episode_buffer(
assert isinstance(
episode_buffer[key], dict
), f"Element to be inserted is type 'dict', but buffer accepts type {type(episode_buffer[key])}"

episode_buffer[key] = self._add_to_episode_buffer(
episode_buffer[key], value
)
Expand Down Expand Up @@ -203,7 +204,6 @@ def step(
terminated=terminated,
truncated=truncated,
)

# Force step data dictionary to include keys corresponding to Gymnasium step returns:
# actions, observations, rewards, terminations, truncations, and infos
assert STEP_DATA_KEYS.issubset(
Expand All @@ -230,7 +230,12 @@ def step(
# or truncation. This may happen if the step_data_callback truncates or terminates the episode under
# certain conditions.
if self._new_episode and not self._reset_called:
self._buffer[-1]["observations"] = [self._previous_eps_final_obs]
if isinstance(self._previous_eps_final_obs, dict):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix for dictionary observation spaces

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I think it would be good to add a test for this behavior.

self._buffer[-1]["observations"] = self._add_to_episode_buffer(
{}, self._previous_eps_final_obs
)
else:
self._buffer[-1]["observations"] = [self._previous_eps_final_obs]
if self._record_infos:
self._buffer[-1]["infos"] = self._add_to_episode_buffer(
{}, self._previous_eps_final_info
Expand Down Expand Up @@ -447,7 +452,9 @@ def clear_buffer(dictionary_buffer: EpisodeBuffer, episode_group: h5py.Group):
# Clear in-memory buffers
self._buffer.clear()

def save_to_disk(self, path: str, dataset_metadata: Optional[Dict] = None):
def save_to_disk(
self, path: str, dataset_metadata: Optional[Dict[str, Any]] = None
):
"""Save all in-memory buffer data and move temporary HDF5 file to a permanent location in disk.

Args:
Expand Down
2 changes: 1 addition & 1 deletion minari/dataset/minari_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def episode_indices(self) -> np.ndarray:
"""Indices of the available episodes to sample within the Minari dataset."""
return self._episode_indices

def recover_environment(self):
def recover_environment(self) -> gym.Env:
"""Recover the Gymnasium environment used to create the dataset.

Returns:
Expand Down
155 changes: 147 additions & 8 deletions minari/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

import copy
import os
import warnings
from typing import Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import gymnasium as gym
import h5py
import numpy as np
from gymnasium.core import ActType, ObsType
from gymnasium.envs.registration import EnvSpec
from gymnasium.wrappers import RecordEpisodeStatistics

from minari import DataCollectorV0
from minari.dataset.minari_dataset import MinariDataset
Expand All @@ -16,6 +19,19 @@
from minari.storage.datasets_root_dir import get_dataset_path


class RandomPolicy:
"""A random action selection policy to compute `ref_min_score`."""

def __init__(self, env: gym.Env):
self.action_space = env.action_space
self.action_space.seed(123)
self.observation_space = env.observation_space

def __call__(self, observation: ObsType) -> ActType:
assert self.observation_space.contains(observation)
return self.action_space.sample()


def combine_datasets(
datasets_to_combine: List[MinariDataset], new_dataset_id: str, copy: bool = False
):
Expand Down Expand Up @@ -163,6 +179,28 @@ def split_dataset(
return out_datasets


def get_average_reference_score(
env: gym.Env,
policy: Callable[[ObsType], ActType],
num_episodes: int,
) -> float:

env = RecordEpisodeStatistics(env, num_episodes)
episode_returns = []
obs, _ = env.reset(seed=123)
for _ in range(num_episodes):
while True:
action = policy(obs)
obs, _, terminated, truncated, info = env.step(action)
if terminated or truncated:
episode_returns.append(info["episode"]["r"])
obs, _ = env.reset()
break

mean_ref_score = np.mean(episode_returns, dtype=np.float32)
return float(mean_ref_score)


def create_dataset_from_buffers(
dataset_id: str,
env: gym.Env,
Expand All @@ -173,6 +211,10 @@ def create_dataset_from_buffers(
code_permalink: Optional[str] = None,
action_space: Optional[gym.spaces.Space] = None,
observation_space: Optional[gym.spaces.Space] = None,
ref_min_score: Optional[float] = None,
ref_max_score: Optional[float] = None,
expert_policy: Optional[Callable[[ObsType], ActType]] = None,
num_episodes_average_score: int = 100,
):
"""Create Minari dataset from a list of episode dictionary buffers.

Expand All @@ -197,6 +239,11 @@ def create_dataset_from_buffers(
author (Optional[str], optional): author that generated the dataset. Defaults to None.
author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None.
code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None.
ref_min_score( Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small typo with spacing ref_min_score( Optional[float]-> ref_min_score (Optional[float]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we even compute min score if max score is not provided? we won't be able to use it anyway

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already being done. First checking if an expert_policy or ref_max_score is passed and then computing the respective attributes. I'll update the docs, I agree it's not very clear that functionality

ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in `MinariDataset.get_normalized_score()`. Default None.
expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`.
`ref_max_score` and `expert_policy` can't be passed at the same time. Default to None
num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100.

Returns:
MinariDataset
Expand All @@ -223,6 +270,11 @@ def create_dataset_from_buffers(
if action_space is None:
action_space = env.action_space

if expert_policy is not None and ref_max_score is not None:
raise ValueError(
"Can't pass a value for `expert_policy` and `ref_max_score` at the same time."
)

dataset_path = get_dataset_path(dataset_id)

# Check if dataset already exists
Expand Down Expand Up @@ -275,6 +327,22 @@ def create_dataset_from_buffers(
file.attrs["action_space"] = action_space_str
file.attrs["observation_space"] = observation_space_str

if expert_policy is not None or ref_max_score is not None:
env = copy.deepcopy(env)
if ref_min_score is None:
ref_min_score = get_average_reference_score(
env, RandomPolicy(env), num_episodes_average_score
)

if expert_policy is not None:
ref_max_score = get_average_reference_score(
env, expert_policy, num_episodes_average_score
)

file.attrs["ref_max_score"] = ref_max_score
file.attrs["ref_min_score"] = ref_min_score
file.attrs["num_episodes_average_score"] = num_episodes_average_score

return MinariDataset(data_path)
else:
raise ValueError(
Expand All @@ -289,6 +357,10 @@ def create_dataset_from_collector_env(
author: Optional[str] = None,
author_email: Optional[str] = None,
code_permalink: Optional[str] = None,
ref_min_score: Optional[float] = None,
ref_max_score: Optional[float] = None,
expert_policy: Optional[Callable[[ObsType], ActType]] = None,
num_episodes_average_score: int = 100,
):
"""Create a Minari dataset using the data collected from stepping with a Gymnasium environment wrapped with a `DataCollectorV0` Minari wrapper.

Expand All @@ -304,6 +376,11 @@ def create_dataset_from_collector_env(
author (Optional[str], optional): author that generated the dataset. Defaults to None.
author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None.
code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None.
ref_min_score( Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing typo

ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in :meth:`minari.get_normalized_score`. Default None.
expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`.
`ref_max_score` and `expert_policy` can't be passed at the same time. Default to None
num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100.

Returns:
MinariDataset
Expand All @@ -324,6 +401,10 @@ def create_dataset_from_collector_env(
"`author_email` is set to None. For longevity purposes it is highly recommended to provide an author email, or some other obvious contact information.",
UserWarning,
)
if expert_policy is not None and ref_max_score is not None:
raise ValueError(
"Can't pass a value for `expert_policy` and `ref_max_score` at the same time."
)

assert collector_env.datasets_path is not None
dataset_path = os.path.join(collector_env.datasets_path, dataset_id)
Expand All @@ -333,18 +414,76 @@ def create_dataset_from_collector_env(
dataset_path = os.path.join(dataset_path, "data")
os.makedirs(dataset_path)
data_path = os.path.join(dataset_path, "main_data.hdf5")
dataset_metadata: Dict[str, Any] = {
"dataset_id": str(dataset_id),
"algorithm_name": str(algorithm_name),
"author": str(author),
"author_email": str(author_email),
"code_permalink": str(code_permalink),
}

if expert_policy is not None or ref_max_score is not None:
env = copy.deepcopy(collector_env.env)
if ref_min_score is None:
ref_min_score = get_average_reference_score(
env, RandomPolicy(env), num_episodes_average_score
)

if expert_policy is not None:
ref_max_score = get_average_reference_score(
env, expert_policy, num_episodes_average_score
)
Comment on lines +426 to +436
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate of 330-340, it can be factorized
Actually, I think we should try to refactor create_dataset_from_collector_env+ DataCollectorV0.save_to_diskand create_dataset_from_buffers, but we can do it in another PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll update this section and then we can look into making other refactors

dataset_metadata.update(
{
"ref_max_score": ref_max_score,
"ref_min_score": ref_min_score,
"num_episodes_average_score": num_episodes_average_score,
}
)

collector_env.save_to_disk(
data_path,
dataset_metadata={
"dataset_id": str(dataset_id),
"algorithm_name": str(algorithm_name),
"author": str(author),
"author_email": str(author_email),
"code_permalink": str(code_permalink),
},
dataset_metadata=dataset_metadata,
)
return MinariDataset(data_path)
else:
raise ValueError(
f"A Minari dataset with ID {dataset_id} already exists and it cannot be overridden. Please use a different dataset name or version."
)


def get_normalized_score(
dataset: MinariDataset, returns: Union[float, np.float32]
) -> Union[float, np.float32]:
r"""Normalize undiscounted return of an episode.

This function was originally provided in the `D4RL repository <https://github.com/Farama-Foundation/D4RL/blob/71a9549f2091accff93eeff68f1f3ab2c0e0a288/d4rl/offline_env.py#L71>`_.
The computed normalized episode return (normalized score) facilitates the comparison of algorithm performance across different tasks. The returned normalized score will be in a range between 0 and 1.
Where 0 corresponds to the minimum reference score calculated as the average of episode returns collected from a random policy in the environment, and 1 corresponds to a maximum reference score computed as
the average of episode returns from an hypothetical expert policy. These two values are stored as optional attributes in a MinariDataset as `ref_min_score` and `ref_max_score` respectively.

The formula to normalize an episode return is:

.. math:: normalize\_score = \frac{return - ref\_min\_score}{ref\_max\_score - ref\_min\_score}

.. warning:: This utility function is under testing and will not be available in every Minari dataset. For now, only the datasets imported from D4RL will contain the `ref_min_score` and `ref_max_score` attributes.

Args:
dataset (MinariDataset): the MinariDataset with respect to which normalize the score. Must contain the reference score attributes `ref_min_score` and `ref_max_score`.
returns (float | np.float32): a single value or array of episode undiscounted returns to normalize.

Returns:
normalized_scores
"""
with h5py.File(dataset.spec.data_path, "r") as f:
ref_min_score = f.attrs.get("ref_min_score", default=None)
ref_max_score = f.attrs.get("ref_max_score", default=None)
if ref_min_score is None or ref_max_score is None:
raise ValueError(
f"Reference score not provided for dataset {dataset.spec.dataset_id}. Can't compute the normalized score."
Comment on lines +479 to +484
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to access those attributes through MinariStorage
Should we make the MinariDataset._data gettable?

Copy link
Member Author

@rodrigodelazcano rodrigodelazcano Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to avoid extra optional attributes in the MinariStorage and MinariDataset. What we can do is move this function to the MinariStorage (that uses h5py`), and then call this function throught another method in MinariDataset.

def get_normalized_score(self, score):
    return self._data.get_normalized_score(score)

Do you think this is a better option? In my opinion I prefer the current implementation to keep this feature as separated as possible from the dataset structures until we decide if we are going to maintain it at all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible get_normalized_score will be totally removed from the codebase? I think it depends on what you mean by not maintaining it. If it's possible get_normalized_score will be totally removed, then I think the h5py is fine for now, but if get_normalized_scores is likely to stick around in any form in the long run, I think it should be a function of MinariStorage exactly as you described.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to avoid extra optional attributes in the MinariStorage and MinariDataset. What we can do is move this function to the MinariStorage (that uses h5py`), and then call this function throught another method in MinariDataset.

def get_normalized_score(self, score):
    return self._data.get_normalized_score(score)

Do you think this is a better option? In my opinion I prefer the current implementation to keep this feature as separated as possible from the dataset structures until we decide if we are going to maintain it at all.

I prefer the current implementation. I was also thinking on a general method get_attr to get some attribute from the data (and throw an error if there is no such attribute)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of get_attr can you add this in a separe PR @younik?

)

assert isinstance(ref_min_score, float)
assert isinstance(ref_max_score, float)

return (returns - ref_min_score) / (ref_max_score - ref_min_score)