From fcbff7de129baf14d2ec80734c59ca3c93cb5e45 Mon Sep 17 00:00:00 2001 From: Rushiv Arora Date: Fri, 21 Jan 2022 11:28:34 -0500 Subject: [PATCH] Added singledispatch utility to vector.utils & changed order of space argument. (#2536) * Fixed ordering of space. Added singledispatch utility. * Added singledispatch utility to vector.utils & changed order of space argument * Fixed Error from _BaseGymSpaces * Minor adjustment for Discrete Spaces * Fixed Tests/ to reflect changes * Fixed precommit error - custom namespaces * Concrete Implementations start with _ --- gym/spaces/utils.py | 38 +++++----- gym/vector/async_vector_env.py | 12 +-- gym/vector/sync_vector_env.py | 4 +- gym/vector/utils/numpy_utils.py | 72 +++++++++--------- gym/vector/utils/shared_memory.py | 118 +++++++++++++++-------------- gym/vector/utils/spaces.py | 23 +++--- tests/vector/test_numpy_utils.py | 2 +- tests/vector/test_shared_memory.py | 6 +- 8 files changed, 139 insertions(+), 136 deletions(-) diff --git a/gym/spaces/utils.py b/gym/spaces/utils.py index 58d80ac8a..387be60ad 100644 --- a/gym/spaces/utils.py +++ b/gym/spaces/utils.py @@ -24,27 +24,27 @@ def flatdim(space): @flatdim.register(Box) @flatdim.register(MultiBinary) -def flatdim_box_multibinary(space): +def _flatdim_box_multibinary(space): return reduce(op.mul, space.shape, 1) @flatdim.register(Discrete) -def flatdim_discrete(space): +def _flatdim_discrete(space): return int(space.n) @flatdim.register(MultiDiscrete) -def flatdim_multidiscrete(space): +def _flatdim_multidiscrete(space): return int(np.sum(space.nvec)) @flatdim.register(Tuple) -def flatdim_tuple(space): +def _flatdim_tuple(space): return sum(flatdim(s) for s in space.spaces) @flatdim.register(Dict) -def flatdim_dict(space): +def _flatdim_dict(space): return sum(flatdim(s) for s in space.spaces.values()) @@ -64,19 +64,19 @@ def flatten(space, x): @flatten.register(Box) @flatten.register(MultiBinary) -def flatten_box_multibinary(space, x): +def _flatten_box_multibinary(space, x): return np.asarray(x, dtype=space.dtype).flatten() @flatten.register(Discrete) -def flatten_discrete(space, x): +def _flatten_discrete(space, x): onehot = np.zeros(space.n, dtype=space.dtype) onehot[x] = 1 return onehot @flatten.register(MultiDiscrete) -def flatten_multidiscrete(space, x): +def _flatten_multidiscrete(space, x): offsets = np.zeros((space.nvec.size + 1,), dtype=space.dtype) offsets[1:] = np.cumsum(space.nvec.flatten()) @@ -86,12 +86,12 @@ def flatten_multidiscrete(space, x): @flatten.register(Tuple) -def flatten_tuple(space, x): +def _flatten_tuple(space, x): return np.concatenate([flatten(s, x_part) for x_part, s in zip(x, space.spaces)]) @flatten.register(Dict) -def flatten_dict(space, x): +def _flatten_dict(space, x): return np.concatenate([flatten(s, x[key]) for key, s in space.spaces.items()]) @@ -111,17 +111,17 @@ def unflatten(space, x): @unflatten.register(Box) @unflatten.register(MultiBinary) -def unflatten_box_multibinary(space, x): +def _unflatten_box_multibinary(space, x): return np.asarray(x, dtype=space.dtype).reshape(space.shape) @unflatten.register(Discrete) -def unflatten_discrete(space, x): +def _unflatten_discrete(space, x): return int(np.nonzero(x)[0][0]) @unflatten.register(MultiDiscrete) -def unflatten_multidiscrete(space, x): +def _unflatten_multidiscrete(space, x): offsets = np.zeros((space.nvec.size + 1,), dtype=space.dtype) offsets[1:] = np.cumsum(space.nvec.flatten()) @@ -130,7 +130,7 @@ def unflatten_multidiscrete(space, x): @unflatten.register(Tuple) -def unflatten_tuple(space, x): +def _unflatten_tuple(space, x): dims = np.asarray([flatdim(s) for s in space.spaces], dtype=np.int_) list_flattened = np.split(x, np.cumsum(dims[:-1])) return tuple( @@ -139,7 +139,7 @@ def unflatten_tuple(space, x): @unflatten.register(Dict) -def unflatten_dict(space, x): +def _unflatten_dict(space, x): dims = np.asarray([flatdim(s) for s in space.spaces.values()], dtype=np.int_) list_flattened = np.split(x, np.cumsum(dims[:-1])) return OrderedDict( @@ -193,19 +193,19 @@ def flatten_space(space): @flatten_space.register(Box) -def flatten_space_box(space): +def _flatten_space_box(space): return Box(space.low.flatten(), space.high.flatten(), dtype=space.dtype) @flatten_space.register(Discrete) @flatten_space.register(MultiBinary) @flatten_space.register(MultiDiscrete) -def flatten_space_binary(space): +def _flatten_space_binary(space): return Box(low=0, high=1, shape=(flatdim(space),), dtype=space.dtype) @flatten_space.register(Tuple) -def flatten_space_tuple(space): +def _flatten_space_tuple(space): space = [flatten_space(s) for s in space.spaces] return Box( low=np.concatenate([s.low for s in space]), @@ -215,7 +215,7 @@ def flatten_space_tuple(space): @flatten_space.register(Dict) -def flatten_space_dict(space): +def _flatten_space_dict(space): space = [flatten_space(s) for s in space.spaces.values()] return Box( low=np.concatenate([s.low for s in space]), diff --git a/gym/vector/async_vector_env.py b/gym/vector/async_vector_env.py index 369ee523a..7eb1f4c53 100644 --- a/gym/vector/async_vector_env.py +++ b/gym/vector/async_vector_env.py @@ -144,7 +144,7 @@ class AsyncVectorEnv(VectorEnv): self.single_observation_space, n=self.num_envs, ctx=ctx ) self.observations = read_from_shared_memory( - _obs_buffer, self.single_observation_space, n=self.num_envs + self.single_observation_space, _obs_buffer, n=self.num_envs ) except CustomSpaceError: raise ValueError( @@ -301,7 +301,7 @@ class AsyncVectorEnv(VectorEnv): if not self.shared_memory: self.observations = concatenate( - results, self.observations, self.single_observation_space + self.single_observation_space, results, self.observations ) return deepcopy(self.observations) if self.copy else self.observations @@ -392,7 +392,9 @@ class AsyncVectorEnv(VectorEnv): if not self.shared_memory: self.observations = concatenate( - observations_list, self.observations, self.single_observation_space + self.single_observation_space, + observations_list, + self.observations, ) return ( @@ -568,7 +570,7 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error if command == "reset": observation = env.reset(**data) write_to_shared_memory( - index, observation, shared_memory, observation_space + observation_space, index, observation, shared_memory ) pipe.send((None, True)) elif command == "step": @@ -577,7 +579,7 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error info["terminal_observation"] = observation observation = env.reset() write_to_shared_memory( - index, observation, shared_memory, observation_space + observation_space, index, observation, shared_memory ) pipe.send(((None, reward, done, info), True)) elif command == "seed": diff --git a/gym/vector/sync_vector_env.py b/gym/vector/sync_vector_env.py index e4981aaac..c457cba7c 100644 --- a/gym/vector/sync_vector_env.py +++ b/gym/vector/sync_vector_env.py @@ -108,7 +108,7 @@ class SyncVectorEnv(VectorEnv): observation = env.reset(**single_kwargs) observations.append(observation) self.observations = concatenate( - observations, self.observations, self.single_observation_space + self.single_observation_space, observations, self.observations ) return deepcopy(self.observations) if self.copy else self.observations @@ -126,7 +126,7 @@ class SyncVectorEnv(VectorEnv): observations.append(observation) infos.append(info) self.observations = concatenate( - observations, self.observations, self.single_observation_space + self.single_observation_space, observations, self.observations ) return ( diff --git a/gym/vector/utils/numpy_utils.py b/gym/vector/utils/numpy_utils.py index 644ee9ff1..36d411577 100644 --- a/gym/vector/utils/numpy_utils.py +++ b/gym/vector/utils/numpy_utils.py @@ -1,13 +1,16 @@ import numpy as np -from gym.spaces import Space, Tuple, Dict +from gym.spaces import Space, Box, Discrete, MultiDiscrete, MultiBinary, Tuple, Dict from gym.vector.utils.spaces import _BaseGymSpaces from collections import OrderedDict +from functools import singledispatch + __all__ = ["concatenate", "create_empty_array"] -def concatenate(items, out, space): +@singledispatch +def concatenate(space, items, out): """Concatenate multiple samples from space into a single object. Parameters @@ -37,44 +40,43 @@ def concatenate(items, out, space): [0.87383074, 0.192658 , 0.2148103 ]], dtype=float32) """ assert isinstance(items, (list, tuple)) - if isinstance(space, _BaseGymSpaces): - return concatenate_base(items, out, space) - elif isinstance(space, Tuple): - return concatenate_tuple(items, out, space) - elif isinstance(space, Dict): - return concatenate_dict(items, out, space) - elif isinstance(space, Space): - return concatenate_custom(items, out, space) - else: - raise ValueError( - f"Space of type `{type(space)}` is not a valid `gym.Space` instance." - ) + raise ValueError( + f"Space of type `{type(space)}` is not a valid `gym.Space` instance." + ) -def concatenate_base(items, out, space): +@concatenate.register(Box) +@concatenate.register(Discrete) +@concatenate.register(MultiDiscrete) +@concatenate.register(MultiBinary) +def _concatenate_base(space, items, out): return np.stack(items, axis=0, out=out) -def concatenate_tuple(items, out, space): +@concatenate.register(Tuple) +def _concatenate_tuple(space, items, out): return tuple( - concatenate([item[i] for item in items], out[i], subspace) + concatenate(subspace, [item[i] for item in items], out[i]) for (i, subspace) in enumerate(space.spaces) ) -def concatenate_dict(items, out, space): +@concatenate.register(Dict) +def _concatenate_dict(space, items, out): return OrderedDict( [ - (key, concatenate([item[key] for item in items], out[key], subspace)) + (key, concatenate(subspace, [item[key] for item in items], out[key])) for (key, subspace) in space.spaces.items() ] ) -def concatenate_custom(items, out, space): +@concatenate.register(Space) +def _concatenate_custom(space, items, out): return tuple(items) +@singledispatch def create_empty_array(space, n=1, fn=np.zeros): """Create an empty (possibly nested) numpy array. @@ -108,30 +110,27 @@ def create_empty_array(space, n=1, fn=np.zeros): ('velocity', array([[0., 0.], [0., 0.]], dtype=float32))]) """ - if isinstance(space, _BaseGymSpaces): - return create_empty_array_base(space, n=n, fn=fn) - elif isinstance(space, Tuple): - return create_empty_array_tuple(space, n=n, fn=fn) - elif isinstance(space, Dict): - return create_empty_array_dict(space, n=n, fn=fn) - elif isinstance(space, Space): - return create_empty_array_custom(space, n=n, fn=fn) - else: - raise ValueError( - f"Space of type `{type(space)}` is not a valid `gym.Space` instance." - ) + raise ValueError( + f"Space of type `{type(space)}` is not a valid `gym.Space` instance." + ) -def create_empty_array_base(space, n=1, fn=np.zeros): +@create_empty_array.register(Box) +@create_empty_array.register(Discrete) +@create_empty_array.register(MultiDiscrete) +@create_empty_array.register(MultiBinary) +def _create_empty_array_base(space, n=1, fn=np.zeros): shape = space.shape if (n is None) else (n,) + space.shape return fn(shape, dtype=space.dtype) -def create_empty_array_tuple(space, n=1, fn=np.zeros): +@create_empty_array.register(Tuple) +def _create_empty_array_tuple(space, n=1, fn=np.zeros): return tuple(create_empty_array(subspace, n=n, fn=fn) for subspace in space.spaces) -def create_empty_array_dict(space, n=1, fn=np.zeros): +@create_empty_array.register(Dict) +def _create_empty_array_dict(space, n=1, fn=np.zeros): return OrderedDict( [ (key, create_empty_array(subspace, n=n, fn=fn)) @@ -140,5 +139,6 @@ def create_empty_array_dict(space, n=1, fn=np.zeros): ) -def create_empty_array_custom(space, n=1, fn=np.zeros): +@create_empty_array.register(Space) +def _create_empty_array_custom(space, n=1, fn=np.zeros): return None diff --git a/gym/vector/utils/shared_memory.py b/gym/vector/utils/shared_memory.py index b01db4d6e..f680bb147 100644 --- a/gym/vector/utils/shared_memory.py +++ b/gym/vector/utils/shared_memory.py @@ -4,13 +4,16 @@ from ctypes import c_bool from collections import OrderedDict from gym import logger -from gym.spaces import Tuple, Dict +from gym.spaces import Space, Box, Discrete, MultiDiscrete, MultiBinary, Tuple, Dict from gym.error import CustomSpaceError from gym.vector.utils.spaces import _BaseGymSpaces +from functools import singledispatch + __all__ = ["create_shared_memory", "read_from_shared_memory", "write_to_shared_memory"] +@singledispatch def create_shared_memory(space, n=1, ctx=mp): """Create a shared memory object, to be shared across processes. This eventually contains the observations from the vectorized environment. @@ -32,36 +35,35 @@ def create_shared_memory(space, n=1, ctx=mp): shared_memory : dict, tuple, or `multiprocessing.Array` instance Shared object across processes. """ - if isinstance(space, _BaseGymSpaces): - return create_base_shared_memory(space, n=n, ctx=ctx) - elif isinstance(space, Tuple): - return create_tuple_shared_memory(space, n=n, ctx=ctx) - elif isinstance(space, Dict): - return create_dict_shared_memory(space, n=n, ctx=ctx) - else: - raise CustomSpaceError( - "Cannot create a shared memory for space with " - "type `{}`. Shared memory only supports " - "default Gym spaces (e.g. `Box`, `Tuple`, " - "`Dict`, etc...), and does not support custom " - "Gym spaces.".format(type(space)) - ) + raise CustomSpaceError( + "Cannot create a shared memory for space with " + "type `{}`. Shared memory only supports " + "default Gym spaces (e.g. `Box`, `Tuple`, " + "`Dict`, etc...), and does not support custom " + "Gym spaces.".format(type(space)) + ) -def create_base_shared_memory(space, n=1, ctx=mp): +@create_shared_memory.register(Box) +@create_shared_memory.register(Discrete) +@create_shared_memory.register(MultiDiscrete) +@create_shared_memory.register(MultiBinary) +def _create_base_shared_memory(space, n=1, ctx=mp): dtype = space.dtype.char if dtype in "?": dtype = c_bool return ctx.Array(dtype, n * int(np.prod(space.shape))) -def create_tuple_shared_memory(space, n=1, ctx=mp): +@create_shared_memory.register(Tuple) +def _create_tuple_shared_memory(space, n=1, ctx=mp): return tuple( create_shared_memory(subspace, n=n, ctx=ctx) for subspace in space.spaces ) -def create_dict_shared_memory(space, n=1, ctx=mp): +@create_shared_memory.register(Dict) +def _create_dict_shared_memory(space, n=1, ctx=mp): return OrderedDict( [ (key, create_shared_memory(subspace, n=n, ctx=ctx)) @@ -70,7 +72,8 @@ def create_dict_shared_memory(space, n=1, ctx=mp): ) -def read_from_shared_memory(shared_memory, space, n=1): +@singledispatch +def read_from_shared_memory(space, shared_memory, n=1): """Read the batch of observations from shared memory as a numpy array. Parameters @@ -97,45 +100,45 @@ def read_from_shared_memory(shared_memory, space, n=1): memory of `shared_memory`. Any changes to `shared_memory` are forwarded to `observations`, and vice-versa. To avoid any side-effect, use `np.copy`. """ - if isinstance(space, _BaseGymSpaces): - return read_base_from_shared_memory(shared_memory, space, n=n) - elif isinstance(space, Tuple): - return read_tuple_from_shared_memory(shared_memory, space, n=n) - elif isinstance(space, Dict): - return read_dict_from_shared_memory(shared_memory, space, n=n) - else: - raise CustomSpaceError( - "Cannot read from a shared memory for space with " - "type `{}`. Shared memory only supports " - "default Gym spaces (e.g. `Box`, `Tuple`, " - "`Dict`, etc...), and does not support custom " - "Gym spaces.".format(type(space)) - ) + raise CustomSpaceError( + "Cannot read from a shared memory for space with " + "type `{}`. Shared memory only supports " + "default Gym spaces (e.g. `Box`, `Tuple`, " + "`Dict`, etc...), and does not support custom " + "Gym spaces.".format(type(space)) + ) -def read_base_from_shared_memory(shared_memory, space, n=1): +@read_from_shared_memory.register(Box) +@read_from_shared_memory.register(Discrete) +@read_from_shared_memory.register(MultiDiscrete) +@read_from_shared_memory.register(MultiBinary) +def _read_base_from_shared_memory(space, shared_memory, n=1): return np.frombuffer(shared_memory.get_obj(), dtype=space.dtype).reshape( (n,) + space.shape ) -def read_tuple_from_shared_memory(shared_memory, space, n=1): +@read_from_shared_memory.register(Tuple) +def _read_tuple_from_shared_memory(space, shared_memory, n=1): return tuple( - read_from_shared_memory(memory, subspace, n=n) + read_from_shared_memory(subspace, memory, n=n) for (memory, subspace) in zip(shared_memory, space.spaces) ) -def read_dict_from_shared_memory(shared_memory, space, n=1): +@read_from_shared_memory.register(Dict) +def _read_dict_from_shared_memory(space, shared_memory, n=1): return OrderedDict( [ - (key, read_from_shared_memory(shared_memory[key], subspace, n=n)) + (key, read_from_shared_memory(subspace, shared_memory[key], n=n)) for (key, subspace) in space.spaces.items() ] ) -def write_to_shared_memory(index, value, shared_memory, space): +@singledispatch +def write_to_shared_memory(space, index, value, shared_memory): """Write the observation of a single environment into shared memory. Parameters @@ -157,23 +160,20 @@ def write_to_shared_memory(index, value, shared_memory, space): ------- `None` """ - if isinstance(space, _BaseGymSpaces): - write_base_to_shared_memory(index, value, shared_memory, space) - elif isinstance(space, Tuple): - write_tuple_to_shared_memory(index, value, shared_memory, space) - elif isinstance(space, Dict): - write_dict_to_shared_memory(index, value, shared_memory, space) - else: - raise CustomSpaceError( - "Cannot write to a shared memory for space with " - "type `{}`. Shared memory only supports " - "default Gym spaces (e.g. `Box`, `Tuple`, " - "`Dict`, etc...), and does not support custom " - "Gym spaces.".format(type(space)) - ) + raise CustomSpaceError( + "Cannot write to a shared memory for space with " + "type `{}`. Shared memory only supports " + "default Gym spaces (e.g. `Box`, `Tuple`, " + "`Dict`, etc...), and does not support custom " + "Gym spaces.".format(type(space)) + ) -def write_base_to_shared_memory(index, value, shared_memory, space): +@write_to_shared_memory.register(Box) +@write_to_shared_memory.register(Discrete) +@write_to_shared_memory.register(MultiDiscrete) +@write_to_shared_memory.register(MultiBinary) +def _write_base_to_shared_memory(space, index, value, shared_memory): size = int(np.prod(space.shape)) destination = np.frombuffer(shared_memory.get_obj(), dtype=space.dtype) np.copyto( @@ -182,11 +182,13 @@ def write_base_to_shared_memory(index, value, shared_memory, space): ) -def write_tuple_to_shared_memory(index, values, shared_memory, space): +@write_to_shared_memory.register(Tuple) +def _write_tuple_to_shared_memory(space, index, values, shared_memory): for value, memory, subspace in zip(values, shared_memory, space.spaces): - write_to_shared_memory(index, value, memory, subspace) + write_to_shared_memory(subspace, index, value, memory) -def write_dict_to_shared_memory(index, values, shared_memory, space): +@write_to_shared_memory.register(Dict) +def _write_dict_to_shared_memory(space, index, values, shared_memory): for key, subspace in space.spaces.items(): - write_to_shared_memory(index, values[key], shared_memory[key], subspace) + write_to_shared_memory(subspace, index, values[key], shared_memory[key]) diff --git a/gym/vector/utils/spaces.py b/gym/vector/utils/spaces.py index f55ca7049..3bc98b689 100644 --- a/gym/vector/utils/spaces.py +++ b/gym/vector/utils/spaces.py @@ -9,6 +9,7 @@ _BaseGymSpaces = (Box, Discrete, MultiDiscrete, MultiBinary) __all__ = ["_BaseGymSpaces", "batch_space", "iterate"] +@singledispatch def batch_space(space, n=1): """Create a (batched) space, containing multiple copies of a single space. @@ -36,20 +37,15 @@ def batch_space(space, n=1): >>> batch_space(space, n=5) Dict(position:Box(5, 3), velocity:Box(5, 2)) """ - if isinstance(space, _BaseGymSpaces): - return batch_space_base(space, n=n) - elif isinstance(space, Tuple): - return batch_space_tuple(space, n=n) - elif isinstance(space, Dict): - return batch_space_dict(space, n=n) - elif isinstance(space, Space): - return batch_space_custom(space, n=n) - else: - raise ValueError( - f"Cannot batch space with type `{type(space)}`. The space must be a valid `gym.Space` instance." - ) + raise ValueError( + f"Cannot batch space with type `{type(space)}`. The space must be a valid `gym.Space` instance." + ) +@batch_space.register(Box) +@batch_space.register(Discrete) +@batch_space.register(MultiDiscrete) +@batch_space.register(MultiBinary) def batch_space_base(space, n=1): if isinstance(space, Box): repeats = tuple([n] + [1] * space.low.ndim) @@ -71,10 +67,12 @@ def batch_space_base(space, n=1): raise ValueError(f"Space type `{type(space)}` is not supported.") +@batch_space.register(Tuple) def batch_space_tuple(space, n=1): return Tuple(tuple(batch_space(subspace, n=n) for subspace in space.spaces)) +@batch_space.register(Dict) def batch_space_dict(space, n=1): return Dict( OrderedDict( @@ -86,6 +84,7 @@ def batch_space_dict(space, n=1): ) +@batch_space.register(Space) def batch_space_custom(space, n=1): return Tuple(tuple(space for _ in range(n))) diff --git a/tests/vector/test_numpy_utils.py b/tests/vector/test_numpy_utils.py index 749fb8a37..2ccdebe08 100644 --- a/tests/vector/test_numpy_utils.py +++ b/tests/vector/test_numpy_utils.py @@ -46,7 +46,7 @@ def test_concatenate(space): samples = [space.sample() for _ in range(8)] array = create_empty_array(space, n=8) - concatenated = concatenate(samples, array, space) + concatenated = concatenate(space, samples, array) assert np.all(concatenated == array) assert_nested_equal(array, samples, n=8) diff --git a/tests/vector/test_shared_memory.py b/tests/vector/test_shared_memory.py index 32476f5f3..4f75caffe 100644 --- a/tests/vector/test_shared_memory.py +++ b/tests/vector/test_shared_memory.py @@ -108,7 +108,7 @@ def test_write_to_shared_memory(space): raise TypeError(f"Got unknown type `{type(lhs)}`.") def write(i, shared_memory, sample): - write_to_shared_memory(i, sample, shared_memory, space) + write_to_shared_memory(space, i, sample, shared_memory) shared_memory_n8 = create_shared_memory(space, n=8) samples = [space.sample() for _ in range(8)] @@ -155,10 +155,10 @@ def test_read_from_shared_memory(space): raise TypeError(f"Got unknown type `{type(space)}`") def write(i, shared_memory, sample): - write_to_shared_memory(i, sample, shared_memory, space) + write_to_shared_memory(space, i, sample, shared_memory) shared_memory_n8 = create_shared_memory(space, n=8) - memory_view_n8 = read_from_shared_memory(shared_memory_n8, space, n=8) + memory_view_n8 = read_from_shared_memory(space, shared_memory_n8, n=8) samples = [space.sample() for _ in range(8)] processes = [