Added Action masking for Space.sample() (#2906)

* Allows a new RNG to be generated with seed=-1 and updated env_checker to fix bug if environment doesn't use np_random in reset

* Revert "fixed `gym.vector.make` where the checker was being applied in the opposite case than was intended to (#2871)"

This reverts commit 519dfd9117.

* Remove bad pushed commits

* Fixed spelling in core.py

* Pins pytest to the last py 3.6 version

* Add support for action masking in Space.sample(mask=...)

* Fix action mask

* Fix action_mask

* Fix action_mask

* Added docstrings, fixed bugs and added taxi examples

* Fixed bugs

* Add tests for sample

* Add docstrings and test space sample mask Discrete and MultiBinary

* Add MultiDiscrete sampling and tests

* Remove sample mask from graph

* Update gym/spaces/multi_discrete.py

Co-authored-by: Markus Krimmel <montcyril@gmail.com>

* Updates based on Marcus28 and jjshoots for Graph.py

* Updates based on Marcus28 and jjshoots for Graph.py

* jjshoot review

* jjshoot review

* Update assert check

* Update type hints

Co-authored-by: Markus Krimmel <montcyril@gmail.com>
This commit is contained in:
Mark Towers
2022-06-26 23:23:15 +01:00
committed by GitHub
parent d750eb8df0
commit 024b0f5160
11 changed files with 562 additions and 71 deletions

View File

@@ -87,6 +87,22 @@ class TaxiEnv(Env):
- 2: Y(ellow)
- 3: B(lue)
### Info
``step`` and ``reset(return_info=True)`` will return an info dictionary that contains "p" and "action_mask" containing
the probability that the state is taken and a mask of what actions will result in a change of state to speed up training.
As Taxi's initial state is a stochastic, the "p" key represents the probability of the
transition however this value is currently bugged being 1.0, this will be fixed soon.
As the steps are deterministic, "p" represents the probability of the transition which is always 1.0
For some cases, taking an action will have no effect on the state of the agent.
In v0.25.0, ``info["action_mask"]`` contains a np.ndarray for each of the action specifying
if the action will change the state.
To sample a modifying action, use ``action = env.action_space.sample(info["action_mask"])``
Or with a Q-value based algorithm ``action = np.argmax(q_values[obs, np.where(info["action_mask"] == 1)[0]])``.
### Rewards
- -1 per step unless other reward is triggered.
- +20 delivering passenger.
@@ -99,7 +115,7 @@ class TaxiEnv(Env):
```
### Version History
* v3: Map Correction + Cleaner Domain Description
* v3: Map Correction + Cleaner Domain Description, v0.25.0 action masking added to the reset and step information
* v2: Disallow Taxi start location = goal location, Update Taxi observations in the rollout, Update Taxi reward threshold.
* v1: Remove (3,2) from locs, add passidx<4 check
* v0: Initial versions release
@@ -214,6 +230,27 @@ class TaxiEnv(Env):
assert 0 <= i < 5
return reversed(out)
def action_mask(self, state: int):
"""Computes an action mask for the action space using the state information."""
mask = np.zeros(6, dtype=np.int8)
taxi_row, taxi_col, pass_loc, dest_idx = self.decode(state)
if taxi_row < 4:
mask[0] = 1
if taxi_row > 0:
mask[1] = 1
if taxi_col < 4 and self.desc[taxi_row + 1, 2 * taxi_col + 2] == b":":
mask[2] = 1
if taxi_col > 0 and self.desc[taxi_row + 1, 2 * taxi_col] == b":":
mask[3] = 1
if pass_loc < 4 and (taxi_row, taxi_col) == self.locs[pass_loc]:
mask[4] = 1
if pass_loc == 4 and (
(taxi_row, taxi_col) == self.locs[dest_idx]
or (taxi_row, taxi_col) in self.locs
):
mask[5] = 1
return mask
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
@@ -221,7 +258,8 @@ class TaxiEnv(Env):
self.s = s
self.lastaction = a
self.renderer.render_step()
return (int(s), r, d, {"prob": p})
return int(s), r, d, {"prob": p, "action_mask": self.action_mask(s)}
def reset(
self,
@@ -239,7 +277,7 @@ class TaxiEnv(Env):
if not return_info:
return int(self.s)
else:
return int(self.s), {"prob": 1}
return int(self.s), {"prob": 1.0, "action_mask": self.action_mask(self.s)}
def render(self, mode="human"):
if self.render_mode is not None:

View File

@@ -3,6 +3,7 @@ from typing import Dict, List, Optional, Sequence, SupportsFloat, Tuple, Type, U
import numpy as np
import gym.error
from gym import logger
from gym.spaces.space import Space
from gym.utils import seeding
@@ -146,7 +147,7 @@ class Box(Space[np.ndarray]):
else:
raise ValueError("manner is not in {'below', 'above', 'both'}")
def sample(self) -> np.ndarray:
def sample(self, mask: None = None) -> np.ndarray:
r"""Generates a single random sample inside the Box.
In creating a sample of the box, each coordinate is sampled (independently) from a distribution
@@ -157,9 +158,17 @@ class Box(Space[np.ndarray]):
* :math:`(-\infty, b]` : shifted negative exponential distribution
* :math:`(-\infty, \infty)` : normal distribution
Args:
mask: A mask for sampling values from the Box space, currently unsupported.
Returns:
A sampled value from the Box
"""
if mask is not None:
raise gym.error.Error(
f"Box.sample cannot be provided a mask, actual value: {mask}"
)
high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1
sample = np.empty(self.shape)

View File

@@ -1,6 +1,7 @@
"""Implementation of a space that represents the cartesian product of other spaces as a dictionary."""
from collections import OrderedDict
from collections.abc import Mapping, Sequence
from typing import Any
from typing import Dict as TypingDict
from typing import Optional, Union
@@ -137,14 +138,28 @@ class Dict(Space[TypingDict[str, Space]], Mapping):
return seeds
def sample(self) -> dict:
def sample(self, mask: Optional[TypingDict[str, Any]] = None) -> dict:
"""Generates a single random sample from this space.
The sample is an ordered dictionary of independent samples from the constituent spaces.
Args:
mask: An optional mask for each of the subspaces, expects the same keys as the space
Returns:
A dictionary with the same key and sampled values from :attr:`self.spaces`
"""
if mask is not None:
assert isinstance(
mask, dict
), f"Expects mask to be a dict, actual type: {type(mask)}"
assert (
mask.keys() == self.spaces.keys()
), f"Expect mask keys to be same as space keys, mask keys: {mask.keys()}, space keys: {self.spaces.keys()}"
return OrderedDict(
[(k, space.sample(mask[k])) for k, space in self.spaces.items()]
)
return OrderedDict([(k, space.sample()) for k, space in self.spaces.items()])
def contains(self, x) -> bool:

View File

@@ -40,14 +40,40 @@ class Discrete(Space[int]):
self.start = int(start)
super().__init__((), np.int64, seed)
def sample(self) -> int:
def sample(self, mask: Optional[np.ndarray] = None) -> int:
"""Generates a single random sample from this space.
A sample will be chosen uniformly at random.
A sample will be chosen uniformly at random with the mask if provided
Args:
mask: An optional mask for if an action can be selected.
Expected `np.ndarray` of shape `(n,)` and dtype `np.int8` where `1` represents valid actions and `0` invalid / infeasible actions.
If there are no possible actions (i.e. `np.all(mask == 0)`) then `space.start` will be returned.
Returns:
A sampled integer from the space
"""
if mask is not None:
assert isinstance(
mask, np.ndarray
), f"The expected type of the mask is np.ndarray, actual type: {type(mask)}"
assert (
mask.dtype == np.int8
), f"The expected dtype of the mask is np.int8, actual dtype: {mask.dtype}"
assert mask.shape == (
self.n,
), f"The expected shape of the mask is {(self.n,)}, actual shape: {mask.shape}"
valid_action_mask = mask == 1
assert np.all(
np.logical_or(mask == 0, valid_action_mask)
), f"All values of a mask should be 0 or 1, actual values: {mask}"
if np.any(valid_action_mask):
return int(
self.start + self.np_random.choice(np.where(valid_action_mask)[0])
)
else:
return self.start
return int(self.start + self.np_random.integers(self.n))
def contains(self, x) -> bool:

View File

@@ -1,12 +1,12 @@
"""Implementation of a space that represents graph information where nodes and edges can be represented with euclidean space."""
from collections import namedtuple
from typing import NamedTuple, Optional, Sequence, Union
from typing import NamedTuple, Optional, Sequence, Tuple, Union
import numpy as np
from gym.spaces.box import Box
from gym.spaces.discrete import Discrete
from gym.spaces.multi_discrete import MultiDiscrete
from gym.spaces.multi_discrete import SAMPLE_MASK_TYPE, MultiDiscrete
from gym.spaces.space import Space
from gym.utils import seeding
@@ -70,53 +70,80 @@ class Graph(Space):
def _generate_sample_space(
self, base_space: Union[None, Box, Discrete], num: int
) -> Optional[Union[Box, Discrete]]:
# the possibility of this space , got {type(base_space)}aving nothing
if num == 0:
) -> Optional[Union[Box, MultiDiscrete]]:
if num == 0 or base_space is None:
return None
if isinstance(base_space, Box):
return Box(
low=np.array(max(1, num) * [base_space.low]),
high=np.array(max(1, num) * [base_space.high]),
shape=(num, *base_space.shape),
shape=(num,) + base_space.shape,
dtype=base_space.dtype,
seed=self._np_random,
seed=self.np_random,
)
elif isinstance(base_space, Discrete):
return MultiDiscrete(nvec=[base_space.n] * num, seed=self._np_random)
elif base_space is None:
return None
return MultiDiscrete(nvec=[base_space.n] * num, seed=self.np_random)
else:
raise AssertionError(
f"Only Box and Discrete can be accepted as a base_space, got {type(base_space)}, you should not have gotten this error."
f"Expects base space to be Box and Discrete, actual space: {type(base_space)}."
)
def _sample_sample_space(self, sample_space) -> Optional[np.ndarray]:
if sample_space is not None:
return sample_space.sample()
else:
return None
def sample(self) -> NamedTuple:
def sample(
self,
mask: Optional[
Tuple[
Optional[Union[np.ndarray, SAMPLE_MASK_TYPE]],
Optional[Union[np.ndarray, SAMPLE_MASK_TYPE]],
]
] = None,
num_nodes: int = 10,
num_edges: Optional[int] = None,
) -> NamedTuple:
"""Generates a single sample graph with num_nodes between 1 and 10 sampled from the Graph.
Args:
mask: An optional tuple of optional node and edge mask that is only possible with Discrete spaces
(Box spaces don't support sample masks).
If no `num_edges` is provided then the `edge_mask` is multiplied by the number of edges
num_nodes: The number of nodes that will be sampled, the default is 10 nodes
num_edges: An optional number of edges, otherwise, a random number between 0 and `num_nodes`^2
Returns:
A NamedTuple representing a graph with attributes .nodes, .edges, and .edge_links.
"""
num_nodes = self.np_random.integers(low=1, high=10)
assert (
num_nodes > 0
), f"The number of nodes is expected to be greater than 0, actual value: {num_nodes}"
if mask is not None:
node_space_mask, edge_space_mask = mask
else:
node_space_mask, edge_space_mask = None, None
# we only have edges when we have at least 2 nodes
num_edges = 0
if num_nodes > 1:
# maximal number of edges is (n*n) allowing self connections and two way is allowed
num_edges = self.np_random.integers(num_nodes * num_nodes)
if num_edges is None:
if num_nodes > 1:
# maximal number of edges is `n*(n-1)` allowing self connections and two-way is allowed
num_edges = self.np_random.integers(num_nodes * (num_nodes - 1))
else:
num_edges = 0
if edge_space_mask is not None:
edge_space_mask = tuple(edge_space_mask for _ in range(num_edges))
else:
assert (
num_edges >= 0
), f"The number of edges is expected to be greater than 0, actual mask: {num_edges}"
node_sample_space = self._generate_sample_space(self.node_space, num_nodes)
edge_sample_space = self._generate_sample_space(self.edge_space, num_edges)
sampled_node_space = self._generate_sample_space(self.node_space, num_nodes)
sampled_edge_space = self._generate_sample_space(self.edge_space, num_edges)
sampled_nodes = self._sample_sample_space(node_sample_space)
sampled_edges = self._sample_sample_space(edge_sample_space)
sampled_nodes = sampled_node_space.sample(node_space_mask)
sampled_edges = (
sampled_edge_space.sample(edge_space_mask)
if sampled_edge_space is not None
else None
)
sampled_edge_links = None
if sampled_edges is not None and num_edges > 0:

View File

@@ -51,14 +51,36 @@ class MultiBinary(Space[np.ndarray]):
"""Has stricter type than gym.Space - never None."""
return self._shape # type: ignore
def sample(self) -> np.ndarray:
def sample(self, mask: Optional[np.ndarray] = None) -> np.ndarray:
"""Generates a single random sample from this space.
A sample is drawn by independent, fair coin tosses (one toss per binary variable of the space).
Args:
mask: An optional np.ndarray to mask samples with expected shape of ``space.shape``.
Where mask == 0 then the samples will be 0.
Returns:
Sampled values from space
"""
if mask is not None:
assert isinstance(
mask, np.ndarray
), f"The expected type of the mask is np.ndarray, actual type: {type(mask)}"
assert (
mask.dtype == np.int8
), f"The expected dtype of the mask is np.int8, actual dtype: {mask.dtype}"
assert (
mask.shape == self.shape
), f"The expected shape of the mask is {self.shape}, actual shape: {mask.shape}"
assert np.all(
np.logical_or(mask == 0, mask == 1)
), f"All values of a mask should be 0 or 1, actual values: {mask}"
return mask * self.np_random.integers(
low=0, high=2, size=self.n, dtype=self.dtype
)
return self.np_random.integers(low=0, high=2, size=self.n, dtype=self.dtype)
def contains(self, x) -> bool:

View File

@@ -8,6 +8,8 @@ from gym.spaces.discrete import Discrete
from gym.spaces.space import Space
from gym.utils import seeding
SAMPLE_MASK_TYPE = Tuple[Union["SAMPLE_MASK_TYPE", np.ndarray], ...]
class MultiDiscrete(Space[np.ndarray]):
"""This represents the cartesian product of arbitrary :class:`Discrete` spaces.
@@ -23,8 +25,17 @@ class MultiDiscrete(Space[np.ndarray]):
2. Button A: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
3. Button B: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1
It can be initialized as ``MultiDiscrete([ 5, 2, 2 ])``
It can be initialized as ``MultiDiscrete([ 5, 2, 2 ])`` such that a sample might be ``array([3, 1, 0])``.
Although this feature is rarely used, :class:`MultiDiscrete` spaces may also have several axes
if ``nvec`` has several axes:
Example::
>> d = MultiDiscrete(np.array([[1, 2], [3, 4]]))
>> d.sample()
array([[0, 0],
[2, 3]])
"""
def __init__(
@@ -37,16 +48,6 @@ class MultiDiscrete(Space[np.ndarray]):
The argument ``nvec`` will determine the number of values each categorical variable can take.
Although this feature is rarely used, :class:`MultiDiscrete` spaces may also have several axes
if ``nvec`` has several axes:
Example::
>> d = MultiDiscrete(np.array([[1, 2], [3, 4]]))
>> d.sample()
array([[0, 0],
[2, 3]])
Args:
nvec: vector of counts of each categorical variable. This will usually be a list of integers. However,
you may also pass a more complicated numpy array if you'd like the space to have several axes.
@@ -63,8 +64,56 @@ class MultiDiscrete(Space[np.ndarray]):
"""Has stricter type than :class:`gym.Space` - never None."""
return self._shape # type: ignore
def sample(self) -> np.ndarray:
"""Generates a single random sample this space."""
def sample(self, mask: Optional[SAMPLE_MASK_TYPE] = None) -> np.ndarray:
"""Generates a single random sample this space.
Args:
mask: An optional mask for multi-discrete, expects tuples with a `np.ndarray` mask in the position of each
action with shape `(n,)` where `n` is the number of actions and `dtype=np.int8`.
Only mask values == 1 are possible to sample unless all mask values for an action are 0 then the default action 0 is sampled.
Returns:
An `np.ndarray` of shape `space.shape`
"""
if mask is not None:
def _apply_mask(
sub_mask: SAMPLE_MASK_TYPE, sub_nvec: np.ndarray
) -> Union[int, List[int]]:
if isinstance(sub_mask, np.ndarray):
assert np.issubdtype(
type(sub_nvec), np.integer
), f"Expects the mask to be for an action, actual for {sub_nvec}"
assert (
len(sub_mask) == sub_nvec
), f"Expects the mask length to be equal to the number of actions, mask length: {len(sub_mask)}, nvec length: {sub_nvec}"
assert (
sub_mask.dtype == np.int8
), f"Expects the mask dtype to be np.int8, actual dtype: {sub_mask.dtype}"
valid_action_mask = sub_mask == 1
assert np.all(
np.logical_or(sub_mask == 0, valid_action_mask)
), f"Expects all masks values to 0 or 1, actual values: {sub_mask}"
if np.any(valid_action_mask):
return self.np_random.choice(np.where(valid_action_mask)[0])
else:
return 0
else:
assert isinstance(
sub_mask, tuple
), f"Expects the mask to be a tuple or np.ndarray, actual type: {type(sub_mask)}"
assert len(sub_mask) == len(
sub_nvec
), f"Expects the mask length to be equal to the number of actions, mask length: {len(sub_mask)}, nvec length: {len(sub_nvec)}"
return [
_apply_mask(new_mask, new_nvec)
for new_mask, new_nvec in zip(sub_mask, sub_nvec)
]
return np.array(_apply_mask(mask, self.nvec), dtype=self.dtype)
return (self.np_random.random(self.nvec.shape) * self.nvec).astype(self.dtype)
def contains(self, x) -> bool:

View File

@@ -1,6 +1,7 @@
"""Implementation of the `Space` metaclass."""
from typing import (
Any,
Generic,
Iterable,
List,
@@ -81,8 +82,17 @@ class Space(Generic[T_cov]):
"""Return the shape of the space as an immutable property."""
return self._shape
def sample(self) -> T_cov:
"""Randomly sample an element of this space. Can be uniform or non-uniform sampling based on boundedness of space."""
def sample(self, mask: Optional[Any] = None) -> T_cov:
"""Randomly sample an element of this space.
Can be uniform or non-uniform sampling based on boundedness of space.
Args:
mask: A mask used for sampling, expected ``dtype=np.int8`` and see sample implementation for expected shape.
Returns:
A sampled actions from the space
"""
raise NotImplementedError
def seed(self, seed: Optional[int] = None) -> list:

View File

@@ -1,5 +1,5 @@
"""Implementation of a space that represents the cartesian product of other spaces."""
from typing import Iterable, List, Optional, Sequence, Union
from typing import Iterable, List, Optional, Sequence, Tuple, Union
import numpy as np
@@ -72,14 +72,31 @@ class Tuple(Space[tuple], Sequence):
return seeds
def sample(self) -> tuple:
def sample(self, mask: Optional[Tuple[Optional[np.ndarray]]] = None) -> tuple:
"""Generates a single random sample inside this space.
This method draws independent samples from the subspaces.
Args:
mask: An optional tuple of optional masks for each of the subspace's samples,
expects the same number of masks as spaces
Returns:
Tuple of the subspace's samples
"""
if mask is not None:
assert isinstance(
mask, tuple
), f"Expected type of mask is tuple, actual type: {type(mask)}"
assert len(mask) == len(
self.spaces
), f"Expected length of mask is {len(self.spaces)}, actual length: {len(mask)}"
return tuple(
space.sample(mask=sub_mask)
for space, sub_mask in zip(self.spaces, mask)
)
return tuple(space.sample() for space in self.spaces)
def contains(self, x) -> bool:

View File

@@ -3,6 +3,7 @@ import pytest
import gym
from gym.envs.box2d import BipedalWalker
from gym.envs.box2d.lunar_lander import demo_heuristic_lander
from gym.envs.toy_text import TaxiEnv
from gym.envs.toy_text.frozen_lake import generate_random_map
@@ -80,3 +81,24 @@ def test_frozenlake_dfs_map_generation(map_size: int):
if new_frozenlake[new_row][new_col] not in "#H":
frontier.append((new_row, new_col))
raise AssertionError("No path through the frozenlake was found.")
def test_taxi_action_mask():
env = TaxiEnv()
for state in env.P:
mask = env.action_mask(state)
for action, possible in enumerate(mask):
_, next_state, _, _ = env.P[state][action][0]
assert state != next_state if possible else state == next_state
def test_taxi_encode_decode():
env = TaxiEnv()
state = env.reset()
for _ in range(100):
assert (
env.encode(*env.decode(state)) == state
), f"state={state}, encode(decode(state))={env.encode(*env.decode(state))}"
state, _, _, _ = env.step(env.action_space.sample())

View File

@@ -2,10 +2,12 @@ import copy
import json # note: ujson fails this test due to float equality
import pickle
import tempfile
from typing import List, Union
import numpy as np
import pytest
from gym import Space
from gym.spaces import Box, Dict, Discrete, Graph, MultiBinary, MultiDiscrete, Tuple
@@ -149,36 +151,290 @@ def test_inequality(spaces):
assert space1 != space2, f"Expected {space1} != {space2}"
# The expected sum of variance for an alpha of 0.05
# CHI_SQUARED = [0] + [scipy.stats.chi2.isf(0.05, df=df) for df in range(1, 25)]
CHI_SQUARED = np.array(
[
0.01,
3.8414588206941285,
5.991464547107983,
7.814727903251178,
9.487729036781158,
11.070497693516355,
12.59158724374398,
14.067140449340167,
15.507313055865454,
16.91897760462045,
]
)
@pytest.mark.parametrize(
"space",
[
Discrete(1),
Discrete(5),
Discrete(8, start=-20),
Box(low=0, high=255, shape=(2,), dtype="uint8"),
Box(low=-np.inf, high=np.inf, shape=(3, 3)),
Box(low=1.0, high=np.inf, shape=(3, 3)),
Box(low=-np.inf, high=2.0, shape=(3, 3)),
Box(low=0, high=255, shape=(2,), dtype=np.uint8),
Box(low=-np.inf, high=np.inf, shape=(3,)),
Box(low=1.0, high=np.inf, shape=(3,)),
Box(low=-np.inf, high=2.0, shape=(3,)),
Box(low=np.array([0, 2]), high=np.array([10, 4])),
MultiDiscrete([3, 5]),
MultiDiscrete(np.array([[3, 5], [2, 1]])),
MultiBinary([2, 4]),
],
)
def test_sample(space):
def test_sample(space: Space, n_trials: int = 1_000):
"""Test the space sample has the expected distribution with the chi-squared test and KS test.
Example code with scipy.stats.chisquared
import scipy.stats
variance = np.sum(np.square(observed_frequency - expected_frequency) / expected_frequency)
f'X2 at alpha=0.05 = {scipy.stats.chi2.isf(0.05, df=4)}'
f'p-value = {scipy.stats.chi2.sf(variance, df=4)}'
scipy.stats.chisquare(f_obs=observed_frequency)
"""
space.seed(0)
n_trials = 100
samples = np.array([space.sample() for _ in range(n_trials)])
expected_mean = 0.0
if isinstance(space, Box):
if space.is_bounded():
expected_mean = (space.high + space.low) / 2
elif space.is_bounded("below"):
expected_mean = 1 + space.low
elif space.is_bounded("above"):
expected_mean = -1 + space.high
assert len(samples) == n_trials
# todo add Box space test
if isinstance(space, Discrete):
expected_frequency = np.ones(space.n) * n_trials / space.n
observed_frequency = np.zeros(space.n)
for sample in samples:
observed_frequency[sample - space.start] += 1
degrees_of_freedom = space.n - 1
assert observed_frequency.shape == expected_frequency.shape
assert np.sum(observed_frequency) == n_trials
variance = np.sum(
np.square(expected_frequency - observed_frequency) / expected_frequency
)
assert variance < CHI_SQUARED[degrees_of_freedom]
elif isinstance(space, MultiBinary):
expected_frequency = n_trials / 2
observed_frequency = np.sum(samples, axis=0)
assert observed_frequency.shape == space.shape
# As this is a binary space, then we can be lazy in the variance as the np.square is symmetric for the 0 and 1 categories
variance = (
2 * np.square(observed_frequency - expected_frequency) / expected_frequency
)
assert variance.shape == space.shape
assert np.all(variance < CHI_SQUARED[1])
elif isinstance(space, MultiDiscrete):
# Due to the multi-axis capability of MultiDiscrete, these functions need to be recursive and that the expected / observed numpy are of non-regular shapes
def _generate_frequency(dim, func):
if isinstance(dim, np.ndarray):
return np.array(
[_generate_frequency(sub_dim, func) for sub_dim in dim],
dtype=object,
)
else:
return func(dim)
def _update_observed_frequency(obs_sample, obs_freq):
if isinstance(obs_sample, np.ndarray):
for sub_sample, sub_freq in zip(obs_sample, obs_freq):
_update_observed_frequency(sub_sample, sub_freq)
else:
obs_freq[obs_sample] += 1
expected_frequency = _generate_frequency(
space.nvec, lambda dim: np.ones(dim) * n_trials / dim
)
observed_frequency = _generate_frequency(space.nvec, lambda dim: np.zeros(dim))
for sample in samples:
_update_observed_frequency(sample, observed_frequency)
def _chi_squared_test(dim, exp_freq, obs_freq):
if isinstance(dim, np.ndarray):
for sub_dim, sub_exp_freq, sub_obs_freq in zip(dim, exp_freq, obs_freq):
_chi_squared_test(sub_dim, sub_exp_freq, sub_obs_freq)
else:
assert exp_freq.shape == (dim,) and obs_freq.shape == (dim,)
assert np.sum(obs_freq) == n_trials
assert np.sum(exp_freq) == n_trials
_variance = np.sum(np.square(exp_freq - obs_freq) / exp_freq)
_degrees_of_freedom = dim - 1
assert _variance < CHI_SQUARED[_degrees_of_freedom]
_chi_squared_test(space.nvec, expected_frequency, observed_frequency)
@pytest.mark.parametrize(
"space,mask",
[
(Discrete(5), np.array([0, 1, 1, 0, 1], dtype=np.int8)),
(Discrete(4, start=-20), np.array([1, 1, 0, 1], dtype=np.int8)),
(Discrete(4, start=1), np.array([0, 0, 0, 0], dtype=np.int8)),
(MultiBinary([3, 2]), np.array([[0, 1], [1, 1], [0, 0]], dtype=np.int8)),
(
MultiDiscrete([5, 3]),
(
np.array([0, 1, 1, 0, 1], dtype=np.int8),
np.array([0, 1, 1], dtype=np.int8),
),
),
(
MultiDiscrete(np.array([4, 2])),
(np.array([0, 0, 0, 0], dtype=np.int8), np.array([1, 1], dtype=np.int8)),
),
(
MultiDiscrete(np.array([[2, 2], [4, 3]])),
(
(np.array([0, 1], dtype=np.int8), np.array([1, 1], dtype=np.int8)),
(
np.array([0, 1, 1, 0], dtype=np.int8),
np.array([1, 0, 0], dtype=np.int8),
),
),
),
],
)
def test_space_sample_mask(space, mask, n_trials: int = 100):
"""Test the space sample with mask works using the pearson chi-squared test."""
space.seed(1)
samples = np.array([space.sample(mask) for _ in range(n_trials)])
if isinstance(space, Discrete):
if np.any(mask == 1):
expected_frequency = np.ones(space.n) * (n_trials / np.sum(mask)) * mask
else:
expected_mean = 0.0
elif isinstance(space, Discrete):
expected_mean = space.start + space.n / 2
expected_frequency = np.zeros(space.n)
expected_frequency[0] = n_trials
observed_frequency = np.zeros(space.n)
for sample in samples:
observed_frequency[sample - space.start] += 1
degrees_of_freedom = max(np.sum(mask) - 1, 0)
assert observed_frequency.shape == expected_frequency.shape
assert np.sum(observed_frequency) == n_trials
assert np.sum(expected_frequency) == n_trials
variance = np.sum(
np.square(expected_frequency - observed_frequency)
/ np.clip(expected_frequency, 1, None)
)
assert variance < CHI_SQUARED[degrees_of_freedom]
elif isinstance(space, MultiBinary):
expected_frequency = np.ones(space.shape) * mask * (n_trials / 2)
observed_frequency = np.sum(samples, axis=0)
assert space.shape == expected_frequency.shape == observed_frequency.shape
variance = (
2
* np.square(observed_frequency - expected_frequency)
/ np.clip(expected_frequency, 1, None)
)
assert variance.shape == space.shape
assert np.all(variance < CHI_SQUARED[1])
elif isinstance(space, MultiDiscrete):
# Due to the multi-axis capability of MultiDiscrete, these functions need to be recursive and that the expected / observed numpy are of non-regular shapes
def _generate_frequency(
_dim: Union[np.ndarray, int], _mask, func: callable
) -> List:
if isinstance(_dim, np.ndarray):
return [
_generate_frequency(sub_dim, sub_mask, func)
for sub_dim, sub_mask in zip(_dim, _mask)
]
else:
return func(_dim, _mask)
def _update_observed_frequency(obs_sample, obs_freq):
if isinstance(obs_sample, np.ndarray):
for sub_sample, sub_freq in zip(obs_sample, obs_freq):
_update_observed_frequency(sub_sample, sub_freq)
else:
obs_freq[obs_sample] += 1
def _exp_freq_fn(_dim: int, _mask: np.ndarray):
if np.any(_mask == 1):
assert _dim == len(_mask)
return np.ones(_dim) * (n_trials / np.sum(_mask)) * _mask
else:
freq = np.zeros(_dim)
freq[0] = n_trials
return freq
expected_frequency = _generate_frequency(
space.nvec, mask, lambda dim, _mask: _exp_freq_fn(dim, _mask)
)
observed_frequency = _generate_frequency(
space.nvec, mask, lambda dim, _: np.zeros(dim)
)
for sample in samples:
_update_observed_frequency(sample, observed_frequency)
def _chi_squared_test(dim, _mask, exp_freq, obs_freq):
if isinstance(dim, np.ndarray):
for sub_dim, sub_mask, sub_exp_freq, sub_obs_freq in zip(
dim, _mask, exp_freq, obs_freq
):
_chi_squared_test(sub_dim, sub_mask, sub_exp_freq, sub_obs_freq)
else:
assert exp_freq.shape == (dim,) and obs_freq.shape == (dim,)
assert np.sum(obs_freq) == n_trials
assert np.sum(exp_freq) == n_trials
_variance = np.sum(
np.square(exp_freq - obs_freq) / np.clip(exp_freq, 1, None)
)
_degrees_of_freedom = max(np.sum(_mask) - 1, 0)
assert _variance < CHI_SQUARED[_degrees_of_freedom]
_chi_squared_test(space.nvec, mask, expected_frequency, observed_frequency)
else:
raise NotImplementedError
np.testing.assert_allclose(expected_mean, samples.mean(), atol=3.0 * samples.std())
raise NotImplementedError()
@pytest.mark.parametrize(
"space,mask",
[
(
Dict(a=Discrete(2), b=MultiDiscrete([2, 4])),
{
"a": np.array([0, 1], dtype=np.int8),
"b": (
np.array([0, 1], dtype=np.int8),
np.array([1, 1, 0, 0], dtype=np.int8),
),
},
),
(
Tuple([Box(0, 1, ()), Discrete(3), MultiBinary([2, 1])]),
(
None,
np.array([0, 1, 0], dtype=np.int8),
np.array([[0], [1]], dtype=np.int8),
),
),
(
Dict(a=Tuple([Box(0, 1, ()), Discrete(3)]), b=Discrete(3)),
{
"a": (None, np.array([1, 0, 0], dtype=np.int8)),
"b": np.array([0, 1, 1], dtype=np.int8),
},
),
(Graph(node_space=Discrete(5), edge_space=Discrete(3)), None),
(
Graph(node_space=Discrete(3), edge_space=Box(low=0, high=1, shape=(5,))),
None,
),
(
Graph(
node_space=Box(low=-100, high=100, shape=(3,)), edge_space=Discrete(3)
),
None,
),
],
)
def test_composite_space_sample_mask(space, mask):
"""Test that composite space samples use the mask correctly."""
space.sample(mask)
@pytest.mark.parametrize(