Tutorials galleries (#258)

This commit is contained in:
Manuel Goulão
2023-01-11 14:00:51 -06:00
committed by GitHub
parent 35fe9b0f13
commit 4e6dc3e420
14 changed files with 54 additions and 53 deletions

View File

@@ -0,0 +1,2 @@
Gymnasium Basics
----------------

View File

@@ -0,0 +1,510 @@
# fmt: off
"""
Make your own custom environment
================================
This documentation overviews creating new environments and relevant
useful wrappers, utilities and tests included in Gymnasium designed for
the creation of new environments. You can clone gym-examples to play
with the code that is presented here. We recommend that you use a virtual environment:
.. code:: console
git clone https://github.com/Farama-Foundation/gym-examples
cd gym-examples
python -m venv .env
source .env/bin/activate
pip install -e .
Subclassing gymnasium.Env
-------------------------
Before learning how to create your own environment you should check out
`the documentation of Gymnasiums API </api/env>`__.
We will be concerned with a subset of gym-examples that looks like this:
.. code:: sh
gym-examples/
README.md
setup.py
gym_examples/
__init__.py
envs/
__init__.py
grid_world.py
wrappers/
__init__.py
relative_position.py
reacher_weighted_reward.py
discrete_action.py
clip_reward.py
To illustrate the process of subclassing ``gymnasium.Env``, we will
implement a very simplistic game, called ``GridWorldEnv``. We will write
the code for our custom environment in
``gym-examples/gym_examples/envs/grid_world.py``. The environment
consists of a 2-dimensional square grid of fixed size (specified via the
``size`` parameter during construction). The agent can move vertically
or horizontally between grid cells in each timestep. The goal of the
agent is to navigate to a target on the grid that has been placed
randomly at the beginning of the episode.
- Observations provide the location of the target and agent.
- There are 4 actions in our environment, corresponding to the
movements “right”, “up”, “left”, and “down”.
- A done signal is issued as soon as the agent has navigated to the
grid cell where the target is located.
- Rewards are binary and sparse, meaning that the immediate reward is
always zero, unless the agent has reached the target, then it is 1.
An episode in this environment (with ``size=5``) might look like this:
where the blue dot is the agent and the red square represents the
target.
Let us look at the source code of ``GridWorldEnv`` piece by piece:
"""
# %%
# Declaration and Initialization
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#
# Our custom environment will inherit from the abstract class
# ``gymnasium.Env``. You shouldnt forget to add the ``metadata``
# attribute to your class. There, you should specify the render-modes that
# are supported by your environment (e.g. ``"human"``, ``"rgb_array"``,
# ``"ansi"``) and the framerate at which your environment should be
# rendered. Every environment should support ``None`` as render-mode; you
# dont need to add it in the metadata. In ``GridWorldEnv``, we will
# support the modes “rgb_array” and “human” and render at 4 FPS.
#
# The ``__init__`` method of our environment will accept the integer
# ``size``, that determines the size of the square grid. We will set up
# some variables for rendering and define ``self.observation_space`` and
# ``self.action_space``. In our case, observations should provide
# information about the location of the agent and target on the
# 2-dimensional grid. We will choose to represent observations in the form
# of dictionaries with keys ``"agent"`` and ``"target"``. An observation
# may look like ``{"agent": array([1, 0]), "target": array([0, 3])}``.
# Since we have 4 actions in our environment (“right”, “up”, “left”,
# “down”), we will use ``Discrete(4)`` as an action space. Here is the
# declaration of ``GridWorldEnv`` and the implementation of ``__init__``:
import numpy as np
import pygame
import gymnasium as gym
from gymnasium import spaces
class GridWorldEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
def __init__(self, render_mode=None, size=5):
self.size = size # The size of the square grid
self.window_size = 512 # The size of the PyGame window
# Observations are dictionaries with the agent's and the target's location.
# Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
self.observation_space = spaces.Dict(
{
"agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
"target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
}
)
# We have 4 actions, corresponding to "right", "up", "left", "down"
self.action_space = spaces.Discrete(4)
"""
The following dictionary maps abstract actions from `self.action_space` to
the direction we will walk in if that action is taken.
I.e. 0 corresponds to "right", 1 to "up" etc.
"""
self._action_to_direction = {
0: np.array([1, 0]),
1: np.array([0, 1]),
2: np.array([-1, 0]),
3: np.array([0, -1]),
}
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
"""
If human-rendering is used, `self.window` will be a reference
to the window that we draw to. `self.clock` will be a clock that is used
to ensure that the environment is rendered at the correct framerate in
human-mode. They will remain `None` until human-mode is used for the
first time.
"""
self.window = None
self.clock = None
# %%
# Constructing Observations From Environment States
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#
# Since we will need to compute observations both in ``reset`` and
# ``step``, it is often convenient to have a (private) method ``_get_obs``
# that translates the environments state into an observation. However,
# this is not mandatory and you may as well compute observations in
# ``reset`` and ``step`` separately:
def _get_obs(self):
return {"agent": self._agent_location, "target": self._target_location}
# %%
# We can also implement a similar method for the auxiliary information
# that is returned by ``step`` and ``reset``. In our case, we would like
# to provide the manhattan distance between the agent and the target:
def _get_info(self):
return {
"distance": np.linalg.norm(
self._agent_location - self._target_location, ord=1
)
}
# %%
# Oftentimes, info will also contain some data that is only available
# inside the ``step`` method (e.g. individual reward terms). In that case,
# we would have to update the dictionary that is returned by ``_get_info``
# in ``step``.
# %%
# Reset
# ~~~~~
#
# The ``reset`` method will be called to initiate a new episode. You may
# assume that the ``step`` method will not be called before ``reset`` has
# been called. Moreover, ``reset`` should be called whenever a done signal
# has been issued. Users may pass the ``seed`` keyword to ``reset`` to
# initialize any random number generator that is used by the environment
# to a deterministic state. It is recommended to use the random number
# generator ``self.np_random`` that is provided by the environments base
# class, ``gymnasium.Env``. If you only use this RNG, you do not need to
# worry much about seeding, *but you need to remember to call
# ``super().reset(seed=seed)``* to make sure that ``gymnasium.Env``
# correctly seeds the RNG. Once this is done, we can randomly set the
# state of our environment. In our case, we randomly choose the agents
# location and the random sample target positions, until it does not
# coincide with the agents position.
#
# The ``reset`` method should return a tuple of the initial observation
# and some auxiliary information. We can use the methods ``_get_obs`` and
# ``_get_info`` that we implemented earlier for that:
def reset(self, seed=None, options=None):
# We need the following line to seed self.np_random
super().reset(seed=seed)
# Choose the agent's location uniformly at random
self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)
# We will sample the target's location randomly until it does not coincide with the agent's location
self._target_location = self._agent_location
while np.array_equal(self._target_location, self._agent_location):
self._target_location = self.np_random.integers(
0, self.size, size=2, dtype=int
)
observation = self._get_obs()
info = self._get_info()
if self.render_mode == "human":
self._render_frame()
return observation, info
# %%
# Step
# ~~~~
#
# The ``step`` method usually contains most of the logic of your
# environment. It accepts an ``action``, computes the state of the
# environment after applying that action and returns the 4-tuple
# ``(observation, reward, done, info)``. Once the new state of the
# environment has been computed, we can check whether it is a terminal
# state and we set ``done`` accordingly. Since we are using sparse binary
# rewards in ``GridWorldEnv``, computing ``reward`` is trivial once we
# know ``done``. To gather ``observation`` and ``info``, we can again make
# use of ``_get_obs`` and ``_get_info``:
def step(self, action):
# Map the action (element of {0,1,2,3}) to the direction we walk in
direction = self._action_to_direction[action]
# We use `np.clip` to make sure we don't leave the grid
self._agent_location = np.clip(
self._agent_location + direction, 0, self.size - 1
)
# An episode is done iff the agent has reached the target
terminated = np.array_equal(self._agent_location, self._target_location)
reward = 1 if terminated else 0 # Binary sparse rewards
observation = self._get_obs()
info = self._get_info()
if self.render_mode == "human":
self._render_frame()
return observation, reward, terminated, False, info
# %%
# Rendering
# ~~~~~~~~~
#
# Here, we are using PyGame for rendering. A similar approach to rendering
# is used in many environments that are included with Gymnasium and you
# can use it as a skeleton for your own environments:
def render(self):
if self.render_mode == "rgb_array":
return self._render_frame()
def _render_frame(self):
if self.window is None and self.render_mode == "human":
pygame.init()
pygame.display.init()
self.window = pygame.display.set_mode(
(self.window_size, self.window_size)
)
if self.clock is None and self.render_mode == "human":
self.clock = pygame.time.Clock()
canvas = pygame.Surface((self.window_size, self.window_size))
canvas.fill((255, 255, 255))
pix_square_size = (
self.window_size / self.size
) # The size of a single grid square in pixels
# First we draw the target
pygame.draw.rect(
canvas,
(255, 0, 0),
pygame.Rect(
pix_square_size * self._target_location,
(pix_square_size, pix_square_size),
),
)
# Now we draw the agent
pygame.draw.circle(
canvas,
(0, 0, 255),
(self._agent_location + 0.5) * pix_square_size,
pix_square_size / 3,
)
# Finally, add some gridlines
for x in range(self.size + 1):
pygame.draw.line(
canvas,
0,
(0, pix_square_size * x),
(self.window_size, pix_square_size * x),
width=3,
)
pygame.draw.line(
canvas,
0,
(pix_square_size * x, 0),
(pix_square_size * x, self.window_size),
width=3,
)
if self.render_mode == "human":
# The following line copies our drawings from `canvas` to the visible window
self.window.blit(canvas, canvas.get_rect())
pygame.event.pump()
pygame.display.update()
# We need to ensure that human-rendering occurs at the predefined framerate.
# The following line will automatically add a delay to keep the framerate stable.
self.clock.tick(self.metadata["render_fps"])
else: # rgb_array
return np.transpose(
np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
)
# %%
# Close
# ~~~~~
#
# The ``close`` method should close any open resources that were used by
# the environment. In many cases, you dont actually have to bother to
# implement this method. However, in our example ``render_mode`` may be
# ``"human"`` and we might need to close the window that has been opened:
def close(self):
if self.window is not None:
pygame.display.quit()
pygame.quit()
# %%
# In other environments ``close`` might also close files that were opened
# or release other resources. You shouldnt interact with the environment
# after having called ``close``.
# %%
# Registering Envs
# ----------------
#
# In order for the custom environments to be detected by Gymnasium, they
# must be registered as follows. We will choose to put this code in
# ``gym-examples/gym_examples/__init__.py``.
#
# .. code:: python
#
# from gymnasium.envs.registration import register
#
# register(
# id="gym_examples/GridWorld-v0",
# entry_point="gym_examples.envs:GridWorldEnv",
# max_episode_steps=300,
# )
# %%
# The environment ID consists of three components, two of which are
# optional: an optional namespace (here: ``gym_examples``), a mandatory
# name (here: ``GridWorld``) and an optional but recommended version
# (here: v0). It might have also been registered as ``GridWorld-v0`` (the
# recommended approach), ``GridWorld`` or ``gym_examples/GridWorld``, and
# the appropriate ID should then be used during environment creation.
#
# The keyword argument ``max_episode_steps=300`` will ensure that
# GridWorld environments that are instantiated via ``gymnasium.make`` will
# be wrapped in a ``TimeLimit`` wrapper (see `the wrapper
# documentation </api/wrappers>`__ for more information). A done signal
# will then be produced if the agent has reached the target *or* 300 steps
# have been executed in the current episode. To distinguish truncation and
# termination, you can check ``info["TimeLimit.truncated"]``.
#
# Apart from ``id`` and ``entrypoint``, you may pass the following
# additional keyword arguments to ``register``:
#
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | Name | Type | Default | Description |
# +======================+===========+===========+===============================================================================================================+
# | ``reward_threshold`` | ``float`` | ``None`` | The reward threshold before the task is considered solved |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | ``nondeterministic`` | ``bool`` | ``False`` | Whether this environment is non-deterministic even after seeding |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | ``max_episode_steps``| ``int`` | ``None`` | The maximum number of steps that an episode can consist of. If not ``None``, a ``TimeLimit`` wrapper is added |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | ``order_enforce`` | ``bool`` | ``True`` | Whether to wrap the environment in an ``OrderEnforcing`` wrapper |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | ``autoreset`` | ``bool`` | ``False`` | Whether to wrap the environment in an ``AutoResetWrapper`` |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
# | ``kwargs`` | ``dict`` | ``{}`` | The default kwargs to pass to the environment class |
# +----------------------+-----------+-----------+---------------------------------------------------------------------------------------------------------------+
#
# Most of these keywords (except for ``max_episode_steps``,
# ``order_enforce`` and ``kwargs``) do not alter the behavior of
# environment instances but merely provide some extra information about
# your environment. After registration, our custom ``GridWorldEnv``
# environment can be created with
# ``env = gymnasium.make('gym_examples/GridWorld-v0')``.
#
# ``gym-examples/gym_examples/envs/__init__.py`` should have:
#
# .. code:: python
#
# from gym_examples.envs.grid_world import GridWorldEnv
#
# If your environment is not registered, you may optionally pass a module
# to import, that would register your environment before creating it like
# this - ``env = gymnasium.make('module:Env-v0')``, where ``module``
# contains the registration code. For the GridWorld env, the registration
# code is run by importing ``gym_examples`` so if it were not possible to
# import gym_examples explicitly, you could register while making by
# ``env = gymnasium.make('gym_examples:gym_examples/GridWorld-v0)``. This
# is especially useful when youre allowed to pass only the environment ID
# into a third-party codebase (eg. learning library). This lets you
# register your environment without needing to edit the librarys source
# code.
# %%
# Creating a Package
# ------------------
#
# The last step is to structure our code as a Python package. This
# involves configuring ``gym-examples/setup.py``. A minimal example of how
# to do so is as follows:
#
# .. code:: python
#
# from setuptools import setup
#
# setup(
# name="gym_examples",
# version="0.0.1",
# install_requires=["gymnasium==0.26.0", "pygame==2.1.0"],
# )
#
# Creating Environment Instances
# ------------------------------
#
# After you have installed your package locally with
# ``pip install -e gym-examples``, you can create an instance of the
# environment via:
#
# .. code:: python
#
# import gym_examples
# env = gymnasium.make('gym_examples/GridWorld-v0')
#
# You can also pass keyword arguments of your environments constructor to
# ``gymnasium.make`` to customize the environment. In our case, we could
# do:
#
# .. code:: python
#
# env = gymnasium.make('gym_examples/GridWorld-v0', size=10)
#
# Sometimes, you may find it more convenient to skip registration and call
# the environments constructor yourself. Some may find this approach more
# pythonic and environments that are instantiated like this are also
# perfectly fine (but remember to add wrappers as well!).
#
# Using Wrappers
# --------------
#
# Oftentimes, we want to use different variants of a custom environment,
# or we want to modify the behavior of an environment that is provided by
# Gymnasium or some other party. Wrappers allow us to do this without
# changing the environment implementation or adding any boilerplate code.
# Check out the `wrapper documentation </api/wrappers/>`__ for details on
# how to use wrappers and instructions for implementing your own. In our
# example, observations cannot be used directly in learning code because
# they are dictionaries. However, we dont actually need to touch our
# environment implementation to fix this! We can simply add a wrapper on
# top of environment instances to flatten observations into a single
# array:
#
# .. code:: python
#
# import gym_examples
# from gymnasium.wrappers import FlattenObservation
#
# env = gymnasium.make('gym_examples/GridWorld-v0')
# wrapped_env = FlattenObservation(env)
# print(wrapped_env.reset()) # E.g. [3 0 3 3], {}
#
# Wrappers have the big advantage that they make environments highly
# modular. For instance, instead of flattening the observations from
# GridWorld, you might only want to look at the relative position of the
# target and the agent. In the section on
# `ObservationWrappers </api/wrappers/#observationwrapper>`__ we have
# implemented a wrapper that does this job. This wrapper is also available
# in gym-examples:
#
# .. code:: python
#
# import gym_examples
# from gym_examples.wrappers import RelativePosition
#
# env = gymnasium.make('gym_examples/GridWorld-v0')
# wrapped_env = RelativePosition(env)
# print(wrapped_env.reset()) # E.g. [-3 3], {}

View File

@@ -0,0 +1,80 @@
"""
Handling Time Limits
====================
In using Gymnasium environments with reinforcement learning code, a common problem observed is how time limits are incorrectly handled. The ``done`` signal received (in previous versions of OpenAI Gym < 0.26) from ``env.step`` indicated whether an episode has ended. However, this signal did not distinguish whether the episode ended due to ``termination`` or ``truncation``.
Termination
-----------
Termination refers to the episode ending after reaching a terminal state that is defined as part of the environment
definition. Examples are - task success, task failure, robot falling down etc. Notably, this also includes episodes
ending in finite-horizon environments due to a time-limit inherent to the environment. Note that to preserve Markov
property, a representation of the remaining time must be present in the agent's observation in finite-horizon environments.
`(Reference) <https://arxiv.org/abs/1712.00378>`_
Truncation
----------
Truncation refers to the episode ending after an externally defined condition (that is outside the scope of the Markov
Decision Process). This could be a time-limit, a robot going out of bounds etc.
An infinite-horizon environment is an obvious example of where this is needed. We cannot wait forever for the episode
to complete, so we set a practical time-limit after which we forcibly halt the episode. The last state in this case is
not a terminal state since it has a non-zero transition probability of moving to another state as per the Markov
Decision Process that defines the RL problem. This is also different from time-limits in finite horizon environments
as the agent in this case has no idea about this time-limit.
"""
# %%
# Importance in learning code
# ---------------------------
# Bootstrapping (using one or more estimated values of a variable to update estimates of the same variable) is a key
# aspect of Reinforcement Learning. A value function will tell you how much discounted reward you will get from a
# particular state if you follow a given policy. When an episode stops at any given point, by looking at the value of
# the final state, the agent is able to estimate how much discounted reward could have been obtained if the episode has
# continued. This is an example of handling truncation.
#
# More formally, a common example of bootstrapping in RL is updating the estimate of the Q-value function,
#
# .. math::
# Q_{target}(o_t, a_t) = r_t + \gamma . \max_a(Q(o_{t+1}, a_{t+1}))
#
#
# In classical RL, the new ``Q`` estimate is a weighted average of the previous ``Q`` estimate and ``Q_target`` while in Deep
# Q-Learning, the error between ``Q_target`` and the previous ``Q`` estimate is minimized.
#
# However, at the terminal state, bootstrapping is not done,
#
# .. math::
# Q_{target}(o_t, a_t) = r_t
#
# This is where the distinction between termination and truncation becomes important. When an episode ends due to
# termination we don't bootstrap, when it ends due to truncation, we bootstrap.
#
# While using gymnasium environments, the ``done`` signal (default for < v0.26) is frequently used to determine whether to
# bootstrap or not. However, this is incorrect since it does not differentiate between termination and truncation.
#
# A simple example of value functions is shown below. This is an illustrative example and not part of any specific algorithm.
#
# .. code:: python
#
# # INCORRECT
# vf_target = rew + gamma * (1 - done) * vf_next_state
#
# This is incorrect in the case of episode ending due to a truncation, where bootstrapping needs to happen but it doesn't.
# %%
# Solution
# ----------
#
# From v0.26 onwards, Gymnasium's ``env.step`` API returns both termination and truncation information explicitly.
# In the previous version truncation information was supplied through the info key ``TimeLimit.truncated``.
# The correct way to handle terminations and truncations now is,
#
# .. code:: python
#
# # terminated = done and 'TimeLimit.truncated' not in info
# # This was needed in previous versions.
#
# vf_target = rew + gamma * (1 - terminated) * vf_next_state

View File

@@ -0,0 +1,137 @@
"""
Implementing Custom Wrappers
============================
In this tutorial we will describe how to implement your own custom wrappers.
Wrappers are a great way to add functionality to your environments in a modular way.
This will save you a lot of boilerplate code.
We will show how to create a wrapper by
- Inheriting from :class:`gymnasium.ObservationWrapper`
- Inheriting from :class:`gymnasium.ActionWrapper`
- Inheriting from :class:`gymnasium.RewardWrapper`
- Inheriting from :class:`gymnasium.Wrapper`
Before following this tutorial, make sure to check out the docs of the :mod:`gymnasium.wrappers` module.
"""
# %%
# Inheriting from :class:`gymnasium.ObservationWrapper`
# -----------------------------------------------------
# Observation wrappers are useful if you want to apply some function to the observations that are returned
# by an environment. If you implement an observation wrapper, you only need to define this transformation
# by implementing the :meth:`gymnasium.ObservationWrapper.observation` method. Moreover, you should remember to
# update the observation space, if the transformation changes the shape of observations (e.g. by transforming
# dictionaries into numpy arrays, as in the following example).
#
# Imagine you have a 2D navigation task where the environment returns dictionaries as observations with
# keys ``"agent_position"`` and ``"target_position"``. A common thing to do might be to throw away some degrees of
# freedom and only consider the position of the target relative to the agent, i.e.
# ``observation["target_position"] - observation["agent_position"]``. For this, you could implement an
# observation wrapper like this:
import numpy as np
from gym import ActionWrapper, ObservationWrapper, RewardWrapper, Wrapper
import gymnasium as gym
from gymnasium.spaces import Box, Discrete
class RelativePosition(ObservationWrapper):
def __init__(self, env):
super().__init__(env)
self.observation_space = Box(shape=(2,), low=-np.inf, high=np.inf)
def observation(self, obs):
return obs["target"] - obs["agent"]
# %%
# Inheriting from :class:`gymnasium.ActionWrapper`
# ------------------------------------------------
# Action wrappers can be used to apply a transformation to actions before applying them to the environment.
# If you implement an action wrapper, you need to define that transformation by implementing
# :meth:`gymnasium.ActionWrapper.action`. Moreover, you should specify the domain of that transformation
# by updating the action space of the wrapper.
#
# Lets say you have an environment with action space of type :class:`gymnasium.spaces.Box`, but you would only like
# to use a finite subset of actions. Then, you might want to implement the following wrapper:
class DiscreteActions(ActionWrapper):
def __init__(self, env, disc_to_cont):
super().__init__(env)
self.disc_to_cont = disc_to_cont
self.action_space = Discrete(len(disc_to_cont))
def action(self, act):
return self.disc_to_cont[act]
if __name__ == "__main__":
env = gym.make("LunarLanderContinuous-v2")
wrapped_env = DiscreteActions(
env, [np.array([1, 0]), np.array([-1, 0]), np.array([0, 1]), np.array([0, -1])]
)
print(wrapped_env.action_space) # Discrete(4)
# %%
# Inheriting from :class:`gymnasium.RewardWrapper`
# ------------------------------------------------
# Reward wrappers are used to transform the reward that is returned by an environment.
# As for the previous wrappers, you need to specify that transformation by implementing the
# :meth:`gymnasium.RewardWrapper.reward` method. Also, you might want to update the reward range of the wrapper.
#
# Let us look at an example: Sometimes (especially when we do not have control over the reward
# because it is intrinsic), we want to clip the reward to a range to gain some numerical stability.
# To do that, we could, for instance, implement the following wrapper:
from typing import SupportsFloat
class ClipReward(RewardWrapper):
def __init__(self, env, min_reward, max_reward):
super().__init__(env)
self.min_reward = min_reward
self.max_reward = max_reward
self.reward_range = (min_reward, max_reward)
def reward(self, r: SupportsFloat) -> SupportsFloat:
return np.clip(r, self.min_reward, self.max_reward)
# %%
# Inheriting from :class:`gymnasium.Wrapper`
# ------------------------------------------
# Sometimes you might need to implement a wrapper that does some more complicated modifications (e.g. modify the
# reward based on data in ``info`` or change the rendering behavior).
# Such wrappers can be implemented by inheriting from :class:`gymnasium.Wrapper`.
#
# - You can set a new action or observation space by defining ``self.action_space`` or ``self.observation_space`` in ``__init__``, respectively
# - You can set new metadata and reward range by defining ``self.metadata`` and ``self.reward_range`` in ``__init__``, respectively
# - You can override :meth:`gymnasium.Wrapper.step`, :meth:`gymnasium.Wrapper.render`, :meth:`gymnasium.Wrapper.close` etc.
# If you do this, you can access the environment that was passed
# to your wrapper (which *still* might be wrapped in some other wrapper) by accessing the attribute :attr:`env`.
#
# Let's also take a look at an example for this case. Most MuJoCo environments return a reward that consists
# of different terms: For instance, there might be a term that rewards the agent for completing the task and one term that
# penalizes large actions (i.e. energy usage). Usually, you can pass weight parameters for those terms during
# initialization of the environment. However, *Reacher* does not allow you to do this! Nevertheless, all individual terms
# of the reward are returned in `info`, so let us build a wrapper for Reacher that allows us to weight those terms:
class ReacherRewardWrapper(Wrapper):
def __init__(self, env, reward_dist_weight, reward_ctrl_weight):
super().__init__(env)
self.reward_dist_weight = reward_dist_weight
self.reward_ctrl_weight = reward_ctrl_weight
def step(self, action):
obs, _, terminated, truncated, info = self.env.step(action)
reward = (
self.reward_dist_weight * info["reward_dist"]
+ self.reward_ctrl_weight * info["reward_ctrl"]
)
return obs, reward, terminated, truncated, info

View File

@@ -0,0 +1,710 @@
"""
Training A2C with Vector Envs and Domain Randomization
======================================================
"""
# %%
# Introduction
# ------------
#
# In this tutorial, you'll learn how to use vectorized environments to train an Advantage Actor-Critic agent.
# We are going to use A2C, which is the synchronous version of the A3C algorithm [1].
#
# Vectorized environments [3] can help to achieve quicker and more robust training by allowing multiple instances
# of the same environment to run in parallel (on multiple CPUs). This can significantly reduce the variance and thus speeds up the training.
#
# We will implement an Advantage Actor-Critic from scratch to look at how you can feed batched states into your networks to get a vector of actions
# (one action per environment) and calculate the losses for actor and critic on minibatches of transitions.
# Each minibatch contains the transitions of one sampling phase: `n_steps_per_update` steps are executed in `n_envs` environments in parallel
# (multiply the two to get the number of transitions in a minibatch). After each sampling phase, the losses are calculated and one gradient step is executed.
# To calculate the advantages, we are going to use the Generalized Advantage Estimation (GAE) method [2], which balances the tradeoff
# between variance and bias of the advantage estimates.
#
# The A2C agent class is initialized with the number of features of the input state, the number of actions the agent can take,
# the learning rates and the number of environments that run in parallel to collect experiences. The actor and critic networks are defined
# and their respective optimizers are initialized. The forward pass of the networks takes in a batched vector of states and returns a tensor of state values
# and a tensor of action logits. The select_action method returns a tuple of the chosen actions, the log-probs of those actions, and the state values for each action.
# In addition, it also returns the entropy of the policy distribution, which is subtracted from the loss later (with a weighting factor `ent_coef`) to encourage exploration.
#
# The get_losses function calculates the losses for the actor and critic networks (using GAE), which are then updated using the update_parameters function.
#
# %%
#
# ------------------------------
#
# Author: Till Zemann
# License: MIT License
from __future__ import annotations
import os
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch import optim
from tqdm import tqdm
import gymnasium as gym
# %%
# Advantage Actor-Critic (A2C)
# ----------------------------
#
# The Actor-Critic combines elements of value-based and policy-based methods. In A2C, the agent has two separate neural networks:
# a critic network that estimates the state-value function, and an actor network that outputs logits for a categorical probability distribution over all actions.
# The critic network is trained to minimize the mean squared error between the predicted state values and the actual returns received by the agent
# (this is equivalent to minimizing the squared advantages, because the advantage of an action is as the difference between the return and the state-value: A(s,a) = Q(s,a) - V(s).
# The actor network is trained to maximize the expected return by selecting actions that have high expected values according to the critic network.
#
# The focus of this tutorial will not be on the details of A2C itself. Instead, the tutorial will focus on how to use vectorized environments
# and domain randomization to accelerate the training process for A2C (and other reinforcement learning algorithms).
#
# %%
#
# ------------------------------
#
class A2C(nn.Module):
"""
(Synchronous) Advantage Actor-Critic agent class
Args:
n_features: The number of features of the input state.
n_actions: The number of actions the agent can take.
device: The device to run the computations on (running on a GPU might be quicker for larger Neural Nets,
for this code CPU is totally fine).
critic_lr: The learning rate for the critic network (should usually be larger than the actor_lr).
actor_lr: The learning rate for the actor network.
n_envs: The number of environments that run in parallel (on multiple CPUs) to collect experiences.
"""
def __init__(
self,
n_features: int,
n_actions: int,
device: torch.device,
critic_lr: float,
actor_lr: float,
n_envs: int,
) -> None:
"""Initializes the actor and critic networks and their respective optimizers."""
super().__init__()
self.device = device
self.n_envs = n_envs
critic_layers = [
nn.Linear(n_features, 32),
nn.ReLU(),
nn.Linear(32, 32),
nn.ReLU(),
nn.Linear(32, 1), # estimate V(s)
]
actor_layers = [
nn.Linear(n_features, 32),
nn.ReLU(),
nn.Linear(32, 32),
nn.ReLU(),
nn.Linear(
32, n_actions
), # estimate action logits (will be fed into a softmax later)
]
# define actor and critic networks
self.critic = nn.Sequential(*critic_layers).to(self.device)
self.actor = nn.Sequential(*actor_layers).to(self.device)
# define optimizers for actor and critic
self.critic_optim = optim.RMSprop(self.critic.parameters(), lr=critic_lr)
self.actor_optim = optim.RMSprop(self.actor.parameters(), lr=actor_lr)
def forward(self, x: np.ndarray) -> tuple[torch.Tensor, torch.Tensor]:
"""
Forward pass of the networks.
Args:
x: A batched vector of states.
Returns:
state_values: A tensor with the state values, with shape [n_envs,].
action_logits_vec: A tensor with the action logits, with shape [n_envs, n_actions].
"""
x = torch.Tensor(x).to(self.device)
state_values = self.critic(x) # shape: [n_envs,]
action_logits_vec = self.actor(x) # shape: [n_envs, n_actions]
return (state_values, action_logits_vec)
def select_action(
self, x: np.ndarray
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Returns a tuple of the chosen actions and the log-probs of those actions.
Args:
x: A batched vector of states.
Returns:
actions: A tensor with the actions, with shape [n_steps_per_update, n_envs].
action_log_probs: A tensor with the log-probs of the actions, with shape [n_steps_per_update, n_envs].
state_values: A tensor with the state values, with shape [n_steps_per_update, n_envs].
"""
state_values, action_logits = self.forward(x)
action_pd = torch.distributions.Categorical(
logits=action_logits
) # implicitly uses softmax
actions = action_pd.sample()
action_log_probs = action_pd.log_prob(actions)
entropy = action_pd.entropy()
return (actions, action_log_probs, state_values, entropy)
def get_losses(
self,
rewards: torch.Tensor,
action_log_probs: torch.Tensor,
value_preds: torch.Tensor,
entropy: torch.Tensor,
masks: torch.Tensor,
gamma: float,
lam: float,
ent_coef: float,
device: torch.device,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
Computes the loss of a minibatch (transitions collected in one sampling phase) for actor and critic
using Generalized Advantage Estimation (GAE) to compute the advantages (https://arxiv.org/abs/1506.02438).
Args:
rewards: A tensor with the rewards for each time step in the episode, with shape [n_steps_per_update, n_envs].
action_log_probs: A tensor with the log-probs of the actions taken at each time step in the episode, with shape [n_steps_per_update, n_envs].
value_preds: A tensor with the state value predictions for each time step in the episode, with shape [n_steps_per_update, n_envs].
masks: A tensor with the masks for each time step in the episode, with shape [n_steps_per_update, n_envs].
gamma: The discount factor.
lam: The GAE hyperparameter. (lam=1 corresponds to Monte-Carlo sampling with high variance and no bias,
and lam=0 corresponds to normal TD-Learning that has a low variance but is biased
because the estimates are generated by a Neural Net).
device: The device to run the computations on (e.g. CPU or GPU).
Returns:
critic_loss: The critic loss for the minibatch.
actor_loss: The actor loss for the minibatch.
"""
T = len(rewards)
advantages = torch.zeros(T, self.n_envs, device=device)
# compute the advantages using GAE
gae = 0.0
for t in reversed(range(T - 1)):
td_error = (
rewards[t] + gamma * masks[t] * value_preds[t + 1] - value_preds[t]
)
gae = td_error + gamma * lam * masks[t] * gae
advantages[t] = gae
# calculate the loss of the minibatch for actor and critic
critic_loss = advantages.pow(2).mean()
# give a bonus for higher entropy to encourage exploration
actor_loss = (
-(advantages.detach() * action_log_probs).mean() - ent_coef * entropy.mean()
)
return (critic_loss, actor_loss)
def update_parameters(
self, critic_loss: torch.Tensor, actor_loss: torch.Tensor
) -> None:
"""
Updates the parameters of the actor and critic networks.
Args:
critic_loss: The critic loss.
actor_loss: The actor loss.
"""
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()
self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()
# %%
# Using Vectorized Environments
# -----------------------------
#
# When you calculate the losses for the two Neural Networks over only one epoch, it might have a high variance. With vectorized environments,
# we can play with `n_envs` in parallel and thus get up to a linear speedup (meaning that in theory, we collect samples `n_envs` times quicker)
# that we can use to calculate the loss for the current policy and critic network. When we are using more samples to calculate the loss,
# it will have a lower variance and theirfore leads to quicker learning.
#
# A2C is a synchronous method, meaning that the parameter updates to Networks take place deterministically (after each sampling phase),
# but we can still make use of asynchronous vector envs to spawn multiple processes for parallel environment execution.
#
# The simplest way to create vector environments is by calling `gym.vector.make`, which creates multiple instances of the same environment:
#
envs = gym.vector.make("LunarLander-v2", num_envs=3, max_episode_steps=600)
# %%
# Domain Randomization
# --------------------
#
# If we want to randomize the environment for training to get more robust agents (that can deal with different parameterizations of an environment
# and theirfore might have a higher degree of generalization), we can set the desired parameters manually or use a pseudo-random number generator to generate them.
#
# Manually setting up 3 parallel 'LunarLander-v2' envs with different parameters:
envs = gym.vector.AsyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v2",
gravity=-10.0,
enable_wind=True,
wind_power=15.0,
turbulence_power=1.5,
max_episode_steps=600,
),
lambda: gym.make(
"LunarLander-v2",
gravity=-9.8,
enable_wind=True,
wind_power=10.0,
turbulence_power=1.3,
max_episode_steps=600,
),
lambda: gym.make(
"LunarLander-v2", gravity=-7.0, enable_wind=False, max_episode_steps=600
),
]
)
# %%
#
# ------------------------------
#
# Randomly generating the parameters for 3 parallel 'LunarLander-v2' envs, using `np.clip` to stay in the recommended parameter space:
#
envs = gym.vector.AsyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v2",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
),
max_episode_steps=600,
)
for i in range(3)
]
)
# %%
#
# ------------------------------
#
# Here we are using normal distributions with the standard parameterization of the environment as the mean and an arbitrary standard deviation (scale).
# Depending on the problem, you can experiment with higher variance and use different distributions as well.
#
# If you are training on the same `n_envs` environments for the entire training time, and `n_envs` is a relatively low number
# (in proportion to how complex the environment is), you might still get some overfitting to the specific parameterizations that you picked.
# To mitigate this, you can either pick a high number of randomly parameterized environments or remake your environments every couple of sampling phases
# to generate a new set of pseudo-random parameters.
#
# %%
# Setup
# -----
#
# environment hyperparams
n_envs = 10
n_updates = 1000
n_steps_per_update = 128
randomize_domain = False
# agent hyperparams
gamma = 0.999
lam = 0.95 # hyperparameter for GAE
ent_coef = 0.01 # coefficient for the entropy bonus (to encourage exploration)
actor_lr = 0.001
critic_lr = 0.005
# Note: the actor has a slower learning rate so that the value targets become
# more stationary and are theirfore easier to estimate for the critic
# environment setup
if randomize_domain:
envs = gym.vector.AsyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v2",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
),
max_episode_steps=600,
)
for i in range(n_envs)
]
)
else:
envs = gym.vector.make("LunarLander-v2", num_envs=n_envs, max_episode_steps=600)
obs_shape = envs.single_observation_space.shape[0]
action_shape = envs.single_action_space.n
# set the device
use_cuda = False
if use_cuda:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
device = torch.device("cpu")
# init the agent
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr, n_envs)
# %%
# Training the A2C Agent
# ----------------------
#
# For our training loop, we are using the `RecordEpisodeStatistics` wrapper to record the episode lengths and returns and we are also saving
# the losses and entropies to plot them after the agent finished training.
#
# You may notice that the don't reset the vectorized envs at the start of each episode like we would usually do.
# This is because each environment resets automatically once the episode finishes (each environment takes a different number of timesteps to finish
# an episode because of the random seeds). As a result, we are also not collecting data in `episodes`, but rather just play a certain number of steps
# (`n_steps_per_update`) in each environment (as an example, this could mean that we play 20 timesteps to finish an episode and then
# use the rest of the timesteps to begin a new one).
#
# create a wrapper environment to save episode returns and episode lengths
envs_wrapper = gym.wrappers.RecordEpisodeStatistics(envs, deque_size=n_envs * n_updates)
critic_losses = []
actor_losses = []
entropies = []
# use tqdm to get a progress bar for training
for sample_phase in tqdm(range(n_updates)):
# we don't have to reset the envs, they just continue playing
# until the episode is over and then reset automatically
# reset lists that collect experiences of an episode (sample phase)
ep_value_preds = torch.zeros(n_steps_per_update, n_envs, device=device)
ep_rewards = torch.zeros(n_steps_per_update, n_envs, device=device)
ep_action_log_probs = torch.zeros(n_steps_per_update, n_envs, device=device)
masks = torch.zeros(n_steps_per_update, n_envs, device=device)
# at the start of training reset all envs to get an initial state
if sample_phase == 0:
states, info = envs_wrapper.reset(seed=42)
# play n steps in our parallel environments to collect data
for step in range(n_steps_per_update):
# select an action A_{t} using S_{t} as input for the agent
actions, action_log_probs, state_value_preds, entropy = agent.select_action(
states
)
# perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
states, rewards, terminated, truncated, infos = envs_wrapper.step(
actions.numpy()
)
ep_value_preds[step] = torch.squeeze(state_value_preds)
ep_rewards[step] = torch.tensor(rewards, device=device)
ep_action_log_probs[step] = action_log_probs
# add a mask (for the return calculation later);
# for each env the mask is 1 if the episode is ongoing and 0 if it is terminated (not by truncation!)
masks[step] = torch.tensor([not term for term in terminated])
# calculate the losses for actor and critic
critic_loss, actor_loss = agent.get_losses(
ep_rewards,
ep_action_log_probs,
ep_value_preds,
entropy,
masks,
gamma,
lam,
ent_coef,
device,
)
# update the actor and critic networks
agent.update_parameters(critic_loss, actor_loss)
# log the losses and entropy
critic_losses.append(critic_loss.detach().cpu().numpy())
actor_losses.append(actor_loss.detach().cpu().numpy())
entropies.append(entropy.detach().mean().cpu().numpy())
# %%
# Plotting
# --------
#
""" plot the results """
# %matplotlib inline
rolling_length = 20
fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 5))
fig.suptitle(
f"Training plots for {agent.__class__.__name__} in the LunarLander-v2 environment \n \
(n_envs={n_envs}, n_steps_per_update={n_steps_per_update}, randomize_domain={randomize_domain})"
)
# episode return
axs[0][0].set_title("Episode Returns")
episode_returns_moving_average = (
np.convolve(
np.array(envs_wrapper.return_queue).flatten(),
np.ones(rolling_length),
mode="valid",
)
/ rolling_length
)
axs[0][0].plot(
np.arange(len(episode_returns_moving_average)) / n_envs,
episode_returns_moving_average,
)
axs[0][0].set_xlabel("Number of episodes")
# entropy
axs[1][0].set_title("Entropy")
entropy_moving_average = (
np.convolve(np.array(entropies), np.ones(rolling_length), mode="valid")
/ rolling_length
)
axs[1][0].plot(entropy_moving_average)
axs[1][0].set_xlabel("Number of updates")
# critic loss
axs[0][1].set_title("Critic Loss")
critic_losses_moving_average = (
np.convolve(
np.array(critic_losses).flatten(), np.ones(rolling_length), mode="valid"
)
/ rolling_length
)
axs[0][1].plot(critic_losses_moving_average)
axs[0][1].set_xlabel("Number of updates")
# actor loss
axs[1][1].set_title("Actor Loss")
actor_losses_moving_average = (
np.convolve(np.array(actor_losses).flatten(), np.ones(rolling_length), mode="valid")
/ rolling_length
)
axs[1][1].plot(actor_losses_moving_average)
axs[1][1].set_xlabel("Number of updates")
plt.tight_layout()
plt.show()
# %%
# .. image:: /_static/img/tutorials/vector_env_a2c_training_plots.png
# :alt: training_plots
#
# %%
# Performance Analysis of Synchronous and Asynchronous Vectorized Environments
# ----------------------------------------------------------------------------
#
# %%
#
# ------------------------------
#
# Asynchronous environments can lead to quicker training times and a higher speedup
# for data collection compared to synchronous environments. This is because asynchronous environments
# allow multiple agents to interact with their environments in parallel,
# while synchronous environments run multiple environments serially.
# This results in better efficiency and faster training times for asynchronous environments.
#
# %%
# .. image:: /_static/img/tutorials/vector_env_performance_plots.png
# :alt: performance_plots
#
# %%
#
# ------------------------------
#
# According to the Karp-Flatt metric (a metric used in parallel computing to estimate the limit for the
# speedup when scaling up the number of parallel processes, here the number of environments),
# the estimated max. speedup for asynchronous environments is 57, while the estimated maximum speedup
# for synchronous environments is 21. This suggests that asynchronous environments have significantly
# faster training times compared to synchronous environments (see graphs).
#
# %%
# .. image:: /_static/img/tutorials/vector_env_karp_flatt_plot.png
# :alt: karp_flatt_metric
#
# %%
#
# ------------------------------
#
# However, it is important to note that increasing the number of parallel vector environments
# can lead to slower training times after a certain number of environments (see plot below, where the
# agent was trained until the mean training returns were above -120). The slower training times might occur
# because the gradients of the environments are good enough after a relatively low number of environments
# (especially if the environment is not very complex). In this case, increasing the number of environments
# does not increase the learning speed, and actually increases the runtime, possibly due to the additional time
# needed to calculate the gradients. For LunarLander-v2, the best performing configuration used a AsyncVectorEnv
# with 10 parallel environments, but environments with a higher complexity may require more
# parallel environments to achieve optimal performance.
#
# %%
# .. image:: /_static/img/tutorials/vector_env_runtime_until_threshold.png
# :alt: runtime_until_threshold_plot
#
# %%
# Saving/ Loading Weights
# -----------------------
#
save_weights = False
load_weights = False
actor_weights_path = "weights/actor_weights.h5"
critic_weights_path = "weights/critic_weights.h5"
if not os.path.exists("weights"):
os.mkdir("weights")
""" save network weights """
if save_weights:
torch.save(agent.actor.state_dict(), actor_weights_path)
torch.save(agent.critic.state_dict(), critic_weights_path)
""" load network weights """
if load_weights:
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr)
agent.actor.load_state_dict(torch.load(actor_weights_path))
agent.critic.load_state_dict(torch.load(critic_weights_path))
agent.actor.eval()
agent.critic.eval()
# %%
# Showcase the Agent
# ------------------
#
""" play a couple of showcase episodes """
n_showcase_episodes = 3
for episode in range(n_showcase_episodes):
print(f"starting episode {episode}...")
# create a new sample environment to get new random parameters
if randomize_domain:
env = gym.make(
"LunarLander-v2",
render_mode="human",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=2.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=2.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=1.0), a_min=0.01, a_max=1.99
),
max_episode_steps=500,
)
else:
env = gym.make("LunarLander-v2", render_mode="human", max_episode_steps=500)
# get an initial state
state, info = env.reset()
# play one episode
done = False
while not done:
# select an action A_{t} using S_{t} as input for the agent
with torch.no_grad():
action, _, _, _ = agent.select_action(state[None, :])
# perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
state, reward, terminated, truncated, info = env.step(action.item())
# update if the environment is done
done = terminated or truncated
env.close()
# %%
# Try playing the environment yourself
# ------------------------------------
#
# from gymnasium.utils.play import play
#
# play(gym.make('LunarLander-v2', render_mode='rgb_array'),
# keys_to_action={'w': 2, 'a': 1, 'd': 3}, noop=0)
# %%
# References
# ----------
#
# [1] V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. P. Lillicrap, T. Harley, D. Silver, K. Kavukcuoglu. "Asynchronous Methods for Deep Reinforcement Learning" ICML (2016).
#
# [2] J. Schulman, P. Moritz, S. Levine, M. Jordan and P. Abbeel. "High-dimensional continuous control using generalized advantage estimation." ICLR (2016).
#
# [3] Gymnasium Documentation: Vector environments. (URL: https://gymnasium.farama.org/api/vector/)