Skip to content

Commit

Permalink
Get normalized scores (#110)
Browse files Browse the repository at this point in the history
* add ref scores to create

* finish create datasets score

* get normalized score

* get_normalized_score

* ref scores create collector env

* assertion ref max score and expert policy

* pre-commit

* docstring update

* fix typo

* carry on previous obs test

* fix test

---------

Co-authored-by: rodrigodelazcano <[email protected]>
  • Loading branch information
rodrigodelazcano and rodrigodelazcano authored Jul 17, 2023
1 parent 275ded1 commit cebc46c
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/api/minari_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ minari_dataset/episode_data
```{eval-rst}
.. autofunction:: minari.combine_datasets
```

## Normalize Score

```{eval-rst}
.. autofunction:: minari.get_normalized_score
```
2 changes: 2 additions & 0 deletions minari/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
combine_datasets,
create_dataset_from_buffers,
create_dataset_from_collector_env,
get_normalized_score,
split_dataset,
)

Expand All @@ -34,6 +35,7 @@
"create_dataset_from_buffers",
"create_dataset_from_collector_env",
"split_dataset",
"get_normalized_score",
]

__version__ = "0.3.1"
13 changes: 10 additions & 3 deletions minari/data_collector/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _add_to_episode_buffer(
assert isinstance(
episode_buffer[key], dict
), f"Element to be inserted is type 'dict', but buffer accepts type {type(episode_buffer[key])}"

episode_buffer[key] = self._add_to_episode_buffer(
episode_buffer[key], value
)
Expand Down Expand Up @@ -203,7 +204,6 @@ def step(
terminated=terminated,
truncated=truncated,
)

# Force step data dictionary to include keys corresponding to Gymnasium step returns:
# actions, observations, rewards, terminations, truncations, and infos
assert STEP_DATA_KEYS.issubset(
Expand All @@ -230,7 +230,12 @@ def step(
# or truncation. This may happen if the step_data_callback truncates or terminates the episode under
# certain conditions.
if self._new_episode and not self._reset_called:
self._buffer[-1]["observations"] = [self._previous_eps_final_obs]
if isinstance(self._previous_eps_final_obs, dict):
self._buffer[-1]["observations"] = self._add_to_episode_buffer(
{}, self._previous_eps_final_obs
)
else:
self._buffer[-1]["observations"] = [self._previous_eps_final_obs]
if self._record_infos:
self._buffer[-1]["infos"] = self._add_to_episode_buffer(
{}, self._previous_eps_final_info
Expand Down Expand Up @@ -447,7 +452,9 @@ def clear_buffer(dictionary_buffer: EpisodeBuffer, episode_group: h5py.Group):
# Clear in-memory buffers
self._buffer.clear()

def save_to_disk(self, path: str, dataset_metadata: Optional[Dict] = None):
def save_to_disk(
self, path: str, dataset_metadata: Optional[Dict[str, Any]] = None
):
"""Save all in-memory buffer data and move temporary HDF5 file to a permanent location in disk.
Args:
Expand Down
2 changes: 1 addition & 1 deletion minari/dataset/minari_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def episode_indices(self) -> np.ndarray:
"""Indices of the available episodes to sample within the Minari dataset."""
return self._episode_indices

def recover_environment(self):
def recover_environment(self) -> gym.Env:
"""Recover the Gymnasium environment used to create the dataset.
Returns:
Expand Down
156 changes: 148 additions & 8 deletions minari/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

import copy
import os
import warnings
from typing import Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import gymnasium as gym
import h5py
import numpy as np
from gymnasium.core import ActType, ObsType
from gymnasium.envs.registration import EnvSpec
from gymnasium.wrappers import RecordEpisodeStatistics

from minari import DataCollectorV0
from minari.dataset.minari_dataset import MinariDataset
Expand All @@ -16,6 +19,19 @@
from minari.storage.datasets_root_dir import get_dataset_path


class RandomPolicy:
"""A random action selection policy to compute `ref_min_score`."""

def __init__(self, env: gym.Env):
self.action_space = env.action_space
self.action_space.seed(123)
self.observation_space = env.observation_space

def __call__(self, observation: ObsType) -> ActType:
assert self.observation_space.contains(observation)
return self.action_space.sample()


def combine_datasets(
datasets_to_combine: List[MinariDataset], new_dataset_id: str, copy: bool = False
):
Expand Down Expand Up @@ -163,6 +179,28 @@ def split_dataset(
return out_datasets


def get_average_reference_score(
env: gym.Env,
policy: Callable[[ObsType], ActType],
num_episodes: int,
) -> float:

env = RecordEpisodeStatistics(env, num_episodes)
episode_returns = []
obs, _ = env.reset(seed=123)
for _ in range(num_episodes):
while True:
action = policy(obs)
obs, _, terminated, truncated, info = env.step(action)
if terminated or truncated:
episode_returns.append(info["episode"]["r"])
obs, _ = env.reset()
break

mean_ref_score = np.mean(episode_returns, dtype=np.float32)
return float(mean_ref_score)


def create_dataset_from_buffers(
dataset_id: str,
env: gym.Env,
Expand All @@ -173,6 +211,10 @@ def create_dataset_from_buffers(
code_permalink: Optional[str] = None,
action_space: Optional[gym.spaces.Space] = None,
observation_space: Optional[gym.spaces.Space] = None,
ref_min_score: Optional[float] = None,
ref_max_score: Optional[float] = None,
expert_policy: Optional[Callable[[ObsType], ActType]] = None,
num_episodes_average_score: int = 100,
):
"""Create Minari dataset from a list of episode dictionary buffers.
Expand All @@ -197,6 +239,12 @@ def create_dataset_from_buffers(
author (Optional[str], optional): author that generated the dataset. Defaults to None.
author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None.
code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None.
ref_min_score (Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy.
Also note that this attribute will be added to the Minari dataset only if `ref_max_score` or `expert_policy` are assigned a valid value other than None.
ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in `MinariDataset.get_normalized_score()`. Default None.
expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`.
`ref_max_score` and `expert_policy` can't be passed at the same time. Default to None
num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100.
Returns:
MinariDataset
Expand All @@ -223,6 +271,11 @@ def create_dataset_from_buffers(
if action_space is None:
action_space = env.action_space

if expert_policy is not None and ref_max_score is not None:
raise ValueError(
"Can't pass a value for `expert_policy` and `ref_max_score` at the same time."
)

dataset_path = get_dataset_path(dataset_id)

# Check if dataset already exists
Expand Down Expand Up @@ -275,6 +328,22 @@ def create_dataset_from_buffers(
file.attrs["action_space"] = action_space_str
file.attrs["observation_space"] = observation_space_str

if expert_policy is not None or ref_max_score is not None:
env = copy.deepcopy(env)
if ref_min_score is None:
ref_min_score = get_average_reference_score(
env, RandomPolicy(env), num_episodes_average_score
)

if expert_policy is not None:
ref_max_score = get_average_reference_score(
env, expert_policy, num_episodes_average_score
)

file.attrs["ref_max_score"] = ref_max_score
file.attrs["ref_min_score"] = ref_min_score
file.attrs["num_episodes_average_score"] = num_episodes_average_score

return MinariDataset(data_path)
else:
raise ValueError(
Expand All @@ -289,6 +358,10 @@ def create_dataset_from_collector_env(
author: Optional[str] = None,
author_email: Optional[str] = None,
code_permalink: Optional[str] = None,
ref_min_score: Optional[float] = None,
ref_max_score: Optional[float] = None,
expert_policy: Optional[Callable[[ObsType], ActType]] = None,
num_episodes_average_score: int = 100,
):
"""Create a Minari dataset using the data collected from stepping with a Gymnasium environment wrapped with a `DataCollectorV0` Minari wrapper.
Expand All @@ -304,6 +377,11 @@ def create_dataset_from_collector_env(
author (Optional[str], optional): author that generated the dataset. Defaults to None.
author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None.
code_permalink (Optional[str], optional): link to relevant code used to generate the dataset. Defaults to None.
ref_min_score( Optional[float], optional): minimum reference score from the average returns of a random policy. This value is later used to normalize a score with :meth:`minari.get_normalized_score`. If default None the value will be estimated with a default random policy.
ref_max_score (Optional[float], optional: maximum reference score from the average returns of a hypothetical expert policy. This value is used in :meth:`minari.get_normalized_score`. Default None.
expert_policy (Optional[Callable[[ObsType], ActType], optional): policy to compute `ref_max_score` by averaging the returns over a number of episodes equal to `num_episodes_average_score`.
`ref_max_score` and `expert_policy` can't be passed at the same time. Default to None
num_episodes_average_score (int): number of episodes to average over the returns to compute `ref_min_score` and `ref_max_score`. Default to 100.
Returns:
MinariDataset
Expand All @@ -324,6 +402,10 @@ def create_dataset_from_collector_env(
"`author_email` is set to None. For longevity purposes it is highly recommended to provide an author email, or some other obvious contact information.",
UserWarning,
)
if expert_policy is not None and ref_max_score is not None:
raise ValueError(
"Can't pass a value for `expert_policy` and `ref_max_score` at the same time."
)

assert collector_env.datasets_path is not None
dataset_path = os.path.join(collector_env.datasets_path, dataset_id)
Expand All @@ -333,18 +415,76 @@ def create_dataset_from_collector_env(
dataset_path = os.path.join(dataset_path, "data")
os.makedirs(dataset_path)
data_path = os.path.join(dataset_path, "main_data.hdf5")
dataset_metadata: Dict[str, Any] = {
"dataset_id": str(dataset_id),
"algorithm_name": str(algorithm_name),
"author": str(author),
"author_email": str(author_email),
"code_permalink": str(code_permalink),
}

if expert_policy is not None or ref_max_score is not None:
env = copy.deepcopy(collector_env.env)
if ref_min_score is None:
ref_min_score = get_average_reference_score(
env, RandomPolicy(env), num_episodes_average_score
)

if expert_policy is not None:
ref_max_score = get_average_reference_score(
env, expert_policy, num_episodes_average_score
)
dataset_metadata.update(
{
"ref_max_score": ref_max_score,
"ref_min_score": ref_min_score,
"num_episodes_average_score": num_episodes_average_score,
}
)

collector_env.save_to_disk(
data_path,
dataset_metadata={
"dataset_id": str(dataset_id),
"algorithm_name": str(algorithm_name),
"author": str(author),
"author_email": str(author_email),
"code_permalink": str(code_permalink),
},
dataset_metadata=dataset_metadata,
)
return MinariDataset(data_path)
else:
raise ValueError(
f"A Minari dataset with ID {dataset_id} already exists and it cannot be overridden. Please use a different dataset name or version."
)


def get_normalized_score(
dataset: MinariDataset, returns: Union[float, np.float32]
) -> Union[float, np.float32]:
r"""Normalize undiscounted return of an episode.
This function was originally provided in the `D4RL repository <https://github.com/Farama-Foundation/D4RL/blob/71a9549f2091accff93eeff68f1f3ab2c0e0a288/d4rl/offline_env.py#L71>`_.
The computed normalized episode return (normalized score) facilitates the comparison of algorithm performance across different tasks. The returned normalized score will be in a range between 0 and 1.
Where 0 corresponds to the minimum reference score calculated as the average of episode returns collected from a random policy in the environment, and 1 corresponds to a maximum reference score computed as
the average of episode returns from an hypothetical expert policy. These two values are stored as optional attributes in a MinariDataset as `ref_min_score` and `ref_max_score` respectively.
The formula to normalize an episode return is:
.. math:: normalize\_score = \frac{return - ref\_min\_score}{ref\_max\_score - ref\_min\_score}
.. warning:: This utility function is under testing and will not be available in every Minari dataset. For now, only the datasets imported from D4RL will contain the `ref_min_score` and `ref_max_score` attributes.
Args:
dataset (MinariDataset): the MinariDataset with respect to which normalize the score. Must contain the reference score attributes `ref_min_score` and `ref_max_score`.
returns (float | np.float32): a single value or array of episode undiscounted returns to normalize.
Returns:
normalized_scores
"""
with h5py.File(dataset.spec.data_path, "r") as f:
ref_min_score = f.attrs.get("ref_min_score", default=None)
ref_max_score = f.attrs.get("ref_max_score", default=None)
if ref_min_score is None or ref_max_score is None:
raise ValueError(
f"Reference score not provided for dataset {dataset.spec.dataset_id}. Can't compute the normalized score."
)

assert isinstance(ref_min_score, float)
assert isinstance(ref_max_score, float)

return (returns - ref_min_score) / (ref_max_score - ref_min_score)
8 changes: 4 additions & 4 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def reset(self, seed=None, options=None):
return self.observation_space.sample(), {}


class DummyTupleDisceteBoxEnv(gym.Env):
class DummyTupleDiscreteBoxEnv(gym.Env):
def __init__(self):
self.action_space = spaces.Tuple(
(
Expand Down Expand Up @@ -235,8 +235,8 @@ def register_dummy_envs():
)

register(
id="DummyTupleDisceteBoxEnv-v0",
entry_point="tests.common:DummyTupleDisceteBoxEnv",
id="DummyTupleDiscreteBoxEnv-v0",
entry_point="tests.common:DummyTupleDiscreteBoxEnv",
max_episode_steps=5,
)

Expand Down Expand Up @@ -584,7 +584,7 @@ def check_episode_data_integrity(
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
):
"""Checks to see if a list of EpisodeData insteances has consistent data and that the observations and actions are in the appropriate spaces.
"""Checks to see if a list of EpisodeData instances has consistent data and that the observations and actions are in the appropriate spaces.
Args:
episode_data_list (List[EpisodeData]): A list of EpisodeData instances representing episodes.
Expand Down
Loading

0 comments on commit cebc46c

Please sign in to comment.