Compare commits

..

1 Commits

Author SHA1 Message Date
JongGyun Kim
fc0c43b199 RNN support for PPO2 (#859)
* initial implementaion of ppo2_rnn.

* set lstm memory as tf.GraphKeys.LOCAL_VARIABLES.

* replace dones with tf.placeholder_with_default.

* improves for 'play' option.

* removed unnecessary TODO .

* improve lstm code.

* move learning rate placeholer to optimizer scope.

* support the microbatched model.

* sync cnn lstm layer with originals.

* add cnn_lnlstm layer.

* fix a case when `states` is None.

* add initial_state variable to help test.

* make ppo2 rnn test available.

* rename 'obs' with 'observations'.
rename 'transition' with 'transitions'.
fix forgetting `dones` in the replay buffer.
fix a misuse of `states` and `next_states` in the replay buffer.

* make initialization once.
make `test_fixed_sequence` compatible with ppo2.

* adjust input shape.

* fix checking of a model input args in `simple_test` function.

* disable warning on purpose.

* support the play.

* improve scopes to compatible with multiple models (i.e, other tensorflow global/local variables)

* clean the scope of ppo2 policy model.

* name the memory variable of PPO RNNs more describly

* wrap the initializations in ppo2.

* remove redundant lines.

* update `REAMD.md`.

* add RNN layers.

* add the result of HalfCheeta-v2 env  experiment.

* correct a typo.

* add RNN class.

* rename `nlstm` with `num_units` in RNN builder functions.

* remove state saving.

* reuse RNNs in a2c.utils.

* revert baselines/run.py.

* replace `ppo2.step()` with original interface.

* revert `baselines/common/tests/util.py`.

* remove redundant lines.

* revert `baselines/common/test/util.py` to b875fb7.

* remove `states` variable.

* move RNN class to `baselines/ppo2/layers.py' and revert `baselines/common/models.py` to 858afa8.

* rename `model.step_as_dict` with `model.step_with_dict`.

* removed `ppo_lstm_mlp`.

* fix 02e26fd.
2019-04-26 15:17:56 -07:00
64 changed files with 951 additions and 878 deletions

View File

@@ -11,4 +11,4 @@ install:
script:
- flake8 . --show-source --statistics
- docker run -e RUNSLOW=1 baselines-test pytest -v .
- docker run baselines-test pytest -v .

View File

@@ -11,7 +11,7 @@ WORKDIR $CODE_DIR/baselines
# Clean up pycache and pyc files
RUN rm -rf __pycache__ && \
find . -name "*.pyc" -delete && \
pip install 'tensorflow < 2' && \
pip install tensorflow && \
pip install -e .[test]

View File

@@ -1,4 +1,4 @@
**Status:** Maintenance (expect bug fixes and minor updates)
**Status:** Active (under active development, breaking changes may occur)
<img src="data/logo.jpg" width=25% align="right" /> [![Build status](https://travis-ci.org/openai/baselines.svg?branch=master)](https://travis-ci.org/openai/baselines)
@@ -39,24 +39,21 @@ To activate a virtualenv:
More thorough tutorial on virtualenvs and options can be found [here](https://virtualenv.pypa.io/en/stable/)
## Tensorflow versions
The master branch supports Tensorflow from version 1.4 to 1.14. For Tensorflow 2.0 support, please use tf2 branch.
## Installation
- Clone the repo and cd into it:
```bash
git clone https://github.com/openai/baselines.git
cd baselines
```
- If you don't have TensorFlow installed already, install your favourite flavor of TensorFlow. In most cases, you may use
- If you don't have TensorFlow installed already, install your favourite flavor of TensorFlow. In most cases,
```bash
pip install tensorflow-gpu==1.14 # if you have a CUDA-compatible gpu and proper drivers
pip install tensorflow-gpu # if you have a CUDA-compatible gpu and proper drivers
```
or
```bash
pip install tensorflow==1.14
pip install tensorflow
```
to install Tensorflow 1.14, which is the latest version of Tensorflow supported by the master branch. Refer to [TensorFlow installation guide](https://www.tensorflow.org/install/)
should be sufficient. Refer to [TensorFlow installation guide](https://www.tensorflow.org/install/)
for more details.
- Install baselines package
@@ -101,8 +98,6 @@ python -m baselines.run --alg=deepq --env=PongNoFrameskip-v4 --num_timesteps=1e6
```
## Saving, loading and visualizing models
### Saving and loading the model
The algorithms serialization API is not properly unified yet; however, there is a simple method to save / restore trained models.
`--save_path` and `--load_path` command-line option loads the tensorflow state from a given path before training, and saves it after the training, respectively.
Let's imagine you'd like to train ppo2 on Atari Pong, save the model and then later visualize what has it learnt.
@@ -114,19 +109,10 @@ This should get to the mean reward per episode about 20. To load and visualize t
python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4 --num_timesteps=0 --load_path=~/models/pong_20M_ppo2 --play
```
*NOTE:* Mujoco environments require normalization to work properly, so we wrap them with VecNormalize wrapper. Currently, to ensure the models are saved with normalization (so that trained models can be restored and run without further training) the normalization coefficients are saved as tensorflow variables. This can decrease the performance somewhat, so if you require high-throughput steps with Mujoco and do not need saving/restoring the models, it may make sense to use numpy normalization instead. To do that, set 'use_tf=False` in [baselines/run.py](baselines/run.py#L116).
*NOTE:* At the moment Mujoco training uses VecNormalize wrapper for the environment which is not being saved correctly; so loading the models trained on Mujoco will not work well if the environment is recreated. If necessary, you can work around that by replacing RunningMeanStd by TfRunningMeanStd in [baselines/common/vec_env/vec_normalize.py](baselines/common/vec_env/vec_normalize.py#L12). This way, mean and std of environment normalizing wrapper will be saved in tensorflow variables and included in the model file; however, training is slower that way - hence not including it by default
### Logging and vizualizing learning curves and other training metrics
By default, all summary data, including progress, standard output, is saved to a unique directory in a temp folder, specified by a call to Python's [tempfile.gettempdir()](https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir).
The directory can be changed with the `--log_path` command-line option.
```bash
python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4 --num_timesteps=2e7 --save_path=~/models/pong_20M_ppo2 --log_path=~/logs/Pong/
```
*NOTE:* Please be aware that the logger will overwrite files of the same name in an existing directory, thus it's recommended that folder names be given a unique timestamp to prevent overwritten logs.
Another way the temp directory can be changed is through the use of the `$OPENAI_LOGDIR` environment variable.
For examples on how to load and display the training data, see [here](docs/viz/viz.ipynb).
## Loading and vizualizing learning curves and other training metrics
See [here](docs/viz/viz.ipynb) for instructions on how to load and display the training data.
## Subpackages

View File

@@ -6,7 +6,7 @@ from baselines import logger
from baselines.common import set_global_seeds
from baselines.common.policies import build_policy
from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.tf_util import get_session, save_variables
from baselines.common.vec_env.vec_frame_stack import VecFrameStack
from baselines.a2c.utils import batch_to_seq, seq_to_batch
@@ -216,8 +216,7 @@ class Model(object):
self.train = train
self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess)
self.save = functools.partial(save_variables, sess=sess, variables=params)
self.train_model = train_model
self.step_model = step_model
self._step = _step
@@ -359,9 +358,6 @@ def learn(network, env, seed=None, nsteps=20, total_timesteps=int(80e6), q_coef=
total_timesteps=total_timesteps, lrschedule=lrschedule, c=c,
trust_region=trust_region, alpha=alpha, delta=delta)
if load_path is not None:
model.load(load_path)
runner = Runner(env=env, model=model, nsteps=nsteps)
if replay_ratio > 0:
buffer = Buffer(env=env, nsteps=nsteps, size=buffer_size)

View File

@@ -92,7 +92,7 @@ class Model(object):
self.initial_state = step_model.initial_state
tf.global_variables_initializer().run(session=sess)
def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=100, nprocs=32, nsteps=20,
def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=1, nprocs=32, nsteps=20,
ent_coef=0.01, vf_coef=0.5, vf_fisher_coef=1.0, lr=0.25, max_grad_norm=0.5,
kfac_clip=0.001, save_interval=None, lrschedule='linear', load_path=None, is_async=True, **network_kwargs):
set_global_seeds(seed)

View File

@@ -11,7 +11,7 @@ KFAC_DEBUG = False
class KfacOptimizer():
# note that KfacOptimizer will be truly synchronous (and thus deterministic) only if a single-threaded session is used
def __init__(self, learning_rate=0.01, momentum=0.9, clip_kl=0.01, kfac_update=2, stats_accum_iter=60, full_stats_init=False, cold_iter=100, cold_lr=None, is_async=False, async_stats=False, epsilon=1e-2, stats_decay=0.95, blockdiag_bias=False, channel_fac=False, factored_damping=False, approxT2=False, use_float64=False, weight_decay_dict={},max_grad_norm=0.5):
self.max_grad_norm = max_grad_norm
self._lr = learning_rate

View File

@@ -1,3 +1,2 @@
# flake8: noqa F403
from baselines.bench.benchmarks import *
from baselines.bench.monitor import *

View File

@@ -1,4 +1,5 @@
import re
import os.path as osp
import os
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))

View File

@@ -1,11 +1,13 @@
__all__ = ['Monitor', 'get_monitor_files', 'load_results']
import gym
from gym.core import Wrapper
import time
from glob import glob
import csv
import os.path as osp
import json
import numpy as np
class Monitor(Wrapper):
EXT = "monitor.csv"
@@ -160,3 +162,27 @@ def load_results(dir):
df['t'] -= min(header['t_start'] for header in headers)
df.headers = headers # HACK to preserve backwards compatibility
return df
def test_monitor():
env = gym.make("CartPole-v1")
env.seed(0)
mon_file = "/tmp/baselines-test-%s.monitor.csv" % uuid.uuid4()
menv = Monitor(env, mon_file)
menv.reset()
for _ in range(1000):
_, _, done, _ = menv.step(0)
if done:
menv.reset()
f = open(mon_file, 'rt')
firstline = f.readline()
assert firstline.startswith('#')
metadata = json.loads(firstline[1:])
assert metadata['env_id'] == "CartPole-v1"
assert set(metadata.keys()) == {'env_id', 'gym_version', 't_start'}, "Incorrect keys in monitor metadata"
last_logline = pandas.read_csv(f, index_col=None)
assert set(last_logline.keys()) == {'l', 't', 'r'}, "Incorrect keys in monitor logline"
f.close()
os.remove(mon_file)

View File

@@ -1,31 +0,0 @@
from .monitor import Monitor
import gym
import json
def test_monitor():
import pandas
import os
import uuid
env = gym.make("CartPole-v1")
env.seed(0)
mon_file = "/tmp/baselines-test-%s.monitor.csv" % uuid.uuid4()
menv = Monitor(env, mon_file)
menv.reset()
for _ in range(1000):
_, _, done, _ = menv.step(0)
if done:
menv.reset()
f = open(mon_file, 'rt')
firstline = f.readline()
assert firstline.startswith('#')
metadata = json.loads(firstline[1:])
assert metadata['env_id'] == "CartPole-v1"
assert set(metadata.keys()) == {'env_id', 't_start'}, "Incorrect keys in monitor metadata"
last_logline = pandas.read_csv(f, index_col=None)
assert set(last_logline.keys()) == {'l', 't', 'r'}, "Incorrect keys in monitor logline"
f.close()
os.remove(mon_file)

View File

@@ -130,60 +130,27 @@ class ClipRewardEnv(gym.RewardWrapper):
"""Bin reward to {+1, 0, -1} by its sign."""
return np.sign(reward)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env, width=84, height=84, grayscale=True, dict_space_key=None):
"""
Warp frames to 84x84 as done in the Nature paper and later work.
If the environment uses dictionary observations, `dict_space_key` can be specified which indicates which
observation should be warped.
"""
super().__init__(env)
self._width = width
self._height = height
self._grayscale = grayscale
self._key = dict_space_key
if self._grayscale:
num_colors = 1
def __init__(self, env, width=84, height=84, grayscale=True):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = width
self.height = height
self.grayscale = grayscale
if self.grayscale:
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)
else:
num_colors = 3
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 3), dtype=np.uint8)
new_space = gym.spaces.Box(
low=0,
high=255,
shape=(self._height, self._width, num_colors),
dtype=np.uint8,
)
if self._key is None:
original_space = self.observation_space
self.observation_space = new_space
else:
original_space = self.observation_space.spaces[self._key]
self.observation_space.spaces[self._key] = new_space
assert original_space.dtype == np.uint8 and len(original_space.shape) == 3
def observation(self, obs):
if self._key is None:
frame = obs
else:
frame = obs[self._key]
if self._grayscale:
def observation(self, frame):
if self.grayscale:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(
frame, (self._width, self._height), interpolation=cv2.INTER_AREA
)
if self._grayscale:
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
if self.grayscale:
frame = np.expand_dims(frame, -1)
if self._key is None:
obs = frame
else:
obs = obs.copy()
obs[self._key] = frame
return obs
return frame
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
@@ -254,13 +221,6 @@ class LazyFrames(object):
return len(self._force())
def __getitem__(self, i):
return self._force()[i]
def count(self):
frames = self._force()
return frames.shape[frames.ndim - 1]
def frame(self, i):
return self._force()[..., i]
def make_atari(env_id, max_episode_steps=None):

View File

@@ -9,7 +9,7 @@ except ImportError:
MPI = None
import gym
from gym.wrappers import FlattenObservation, FilterObservation
from gym.wrappers import FlattenDictWrapper
from baselines import logger
from baselines.bench import Monitor
from baselines.common import set_global_seeds
@@ -17,26 +17,21 @@ from baselines.common.atari_wrappers import make_atari, wrap_deepmind
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.common import retro_wrappers
from baselines.common.wrappers import ClipActionsWrapper
def make_vec_env(env_id, env_type, num_env, seed,
wrapper_kwargs=None,
env_kwargs=None,
start_index=0,
reward_scale=1.0,
flatten_dict_observations=True,
gamestate=None,
initializer=None,
force_dummy=False):
gamestate=None):
"""
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
"""
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir()
def make_thunk(rank, initializer=None):
def make_thunk(rank):
return lambda: make_env(
env_id=env_id,
env_type=env_type,
@@ -47,30 +42,18 @@ def make_vec_env(env_id, env_type, num_env, seed,
gamestate=gamestate,
flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs,
env_kwargs=env_kwargs,
logger_dir=logger_dir,
initializer=initializer
logger_dir=logger_dir
)
set_global_seeds(seed)
if not force_dummy and num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)])
if num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index) for i in range(num_env)])
else:
return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])
return DummyVecEnv([make_thunk(start_index)])
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, logger_dir=None):
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)
if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
@@ -78,17 +61,17 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs)
env = gym.make(env_id)
if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
env = FlattenObservation(env)
keys = env.observation_space.spaces.keys()
env = gym.wrappers.FlattenDictWrapper(env, dict_keys=list(keys))
env.seed(seed + subrank if seed is not None else None)
env = Monitor(env,
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True)
if env_type == 'atari':
env = wrap_deepmind(env, **wrapper_kwargs)
elif env_type == 'retro':
@@ -96,9 +79,6 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
wrapper_kwargs['frame_stack'] = 1
env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs)
if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env)
if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale)
@@ -127,7 +107,7 @@ def make_robotics_env(env_id, seed, rank=0):
"""
set_global_seeds(seed)
env = gym.make(env_id)
env = FlattenObservation(FilterObservation(env, ['observation', 'desired_goal']))
env = FlattenDictWrapper(env, ['observation', 'desired_goal'])
env = Monitor(
env, logger.get_dir() and os.path.join(logger.get_dir(), str(rank)),
info_keywords=('is_success',))
@@ -169,7 +149,6 @@ def common_arg_parser():
parser.add_argument('--save_path', help='Path to save trained model to', default=None, type=str)
parser.add_argument('--save_video_interval', help='Save video every x steps (0 = disabled)', default=0, type=int)
parser.add_argument('--save_video_length', help='Length of recorded video. Default: 200', default=200, type=int)
parser.add_argument('--log_path', help='Directory to save learning curve data.', default=None, type=str)
parser.add_argument('--play', default=False, action='store_true')
return parser
@@ -186,7 +165,7 @@ def robotics_arg_parser():
def parse_unknown_args(args):
"""
Parse arguments not consumed by arg parser into a dictionary
Parse arguments not consumed by arg parser into a dicitonary
"""
retval = {}
preceded_by_key = False

View File

@@ -3,6 +3,7 @@ import tensorflow as tf
from baselines.a2c import utils
from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch
from baselines.common.mpi_running_mean_std import RunningMeanStd
import tensorflow.contrib.layers as layers
mapping = {}
@@ -25,51 +26,6 @@ def nature_cnn(unscaled_images, **conv_kwargs):
h3 = conv_to_fc(h3)
return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2)))
def build_impala_cnn(unscaled_images, depths=[16,32,32], **conv_kwargs):
"""
Model used in the paper "IMPALA: Scalable Distributed Deep-RL with
Importance Weighted Actor-Learner Architectures" https://arxiv.org/abs/1802.01561
"""
layer_num = 0
def get_layer_num_str():
nonlocal layer_num
num_str = str(layer_num)
layer_num += 1
return num_str
def conv_layer(out, depth):
return tf.layers.conv2d(out, depth, 3, padding='same', name='layer_' + get_layer_num_str())
def residual_block(inputs):
depth = inputs.get_shape()[-1].value
out = tf.nn.relu(inputs)
out = conv_layer(out, depth)
out = tf.nn.relu(out)
out = conv_layer(out, depth)
return out + inputs
def conv_sequence(inputs, depth):
out = conv_layer(inputs, depth)
out = tf.layers.max_pooling2d(out, pool_size=3, strides=2, padding='same')
out = residual_block(out)
out = residual_block(out)
return out
out = tf.cast(unscaled_images, tf.float32) / 255.
for depth in depths:
out = conv_sequence(out, depth)
out = tf.layers.flatten(out)
out = tf.nn.relu(out)
out = tf.layers.dense(out, 256, activation=tf.nn.relu, name='layer_' + get_layer_num_str())
return out
@register("mlp")
def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False):
@@ -109,11 +65,6 @@ def cnn(**conv_kwargs):
return nature_cnn(X, **conv_kwargs)
return network_fn
@register("impala_cnn")
def impala_cnn(**conv_kwargs):
def network_fn(X):
return build_impala_cnn(X)
return network_fn
@register("cnn_small")
def cnn_small(**conv_kwargs):
@@ -128,6 +79,7 @@ def cnn_small(**conv_kwargs):
return h
return network_fn
@register("lstm")
def lstm(nlstm=128, layer_norm=False):
"""
@@ -184,12 +136,12 @@ def lstm(nlstm=128, layer_norm=False):
@register("cnn_lstm")
def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs):
def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs):
def network_fn(X, nenv=1):
nbatch = X.shape[0]
nsteps = nbatch // nenv
h = conv_fn(X, **conv_kwargs)
h = nature_cnn(X, **conv_kwargs)
M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1)
S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states
@@ -209,9 +161,6 @@ def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs):
return network_fn
@register("impala_cnn_lstm")
def impala_cnn_lstm():
return cnn_lstm(nlstm=256, conv_fn=build_impala_cnn)
@register("cnn_lnlstm")
def cnn_lnlstm(nlstm=128, **conv_kwargs):
@@ -238,7 +187,7 @@ def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs):
out = tf.cast(X, tf.float32) / 255.
with tf.variable_scope("convnet"):
for num_outputs, kernel_size, stride in convs:
out = tf.contrib.layers.convolution2d(out,
out = layers.convolution2d(out,
num_outputs=num_outputs,
kernel_size=kernel_size,
stride=stride,

View File

@@ -2,7 +2,6 @@ import numpy as np
import tensorflow as tf
from baselines.common import tf_util as U
from baselines.common.tests.test_with_mpi import with_mpi
from baselines import logger
try:
from mpi4py import MPI
except ImportError:
@@ -10,34 +9,22 @@ except ImportError:
class MpiAdamOptimizer(tf.train.AdamOptimizer):
"""Adam optimizer that averages gradients across mpi processes."""
def __init__(self, comm, grad_clip=None, mpi_rank_weight=1, **kwargs):
def __init__(self, comm, **kwargs):
self.comm = comm
self.grad_clip = grad_clip
self.mpi_rank_weight = mpi_rank_weight
tf.train.AdamOptimizer.__init__(self, **kwargs)
def compute_gradients(self, loss, var_list, **kwargs):
grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs)
grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None]
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0)
shapes = [v.shape.as_list() for g, v in grads_and_vars]
sizes = [int(np.prod(s)) for s in shapes]
total_weight = np.zeros(1, np.float32)
self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM)
total_weight = total_weight[0]
num_tasks = self.comm.Get_size()
buf = np.zeros(sum(sizes), np.float32)
countholder = [0] # Counts how many times _collect_grads has been called
stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable
def _collect_grads(flat_grad, np_stat):
if self.grad_clip is not None:
gradnorm = np.linalg.norm(flat_grad)
if gradnorm > 1:
flat_grad /= gradnorm
logger.logkv_mean('gradnorm', gradnorm)
logger.logkv_mean('gradclipfrac', float(gradnorm > 1))
self.comm.Allreduce(flat_grad, buf, op=MPI.SUM)
np.divide(buf, float(total_weight), out=buf)
np.divide(buf, float(num_tasks), out=buf)
if countholder[0] % 100 == 0:
check_synced(np_stat, self.comm)
countholder[0] += 1
@@ -64,8 +51,8 @@ def check_synced(localval, comm=None):
comm = comm or MPI.COMM_WORLD
vals = comm.gather(localval)
if comm.rank == 0:
assert all(val==vals[0] for val in vals[1:]),\
'MpiAdamOptimizer detected that different workers have different weights: {}'.format(vals)
assert all(val==vals[0] for val in vals[1:])
@with_mpi(timeout=5)
def test_nonfreeze():
@@ -88,3 +75,4 @@ def test_nonfreeze():
l,_ = sess.run([loss, update_op])
print(i, l)
losslist_ref.append(l)

View File

@@ -12,9 +12,8 @@ def mpi_mean(x, axis=0, comm=None, keepdims=False):
localsum = np.zeros(n+1, x.dtype)
localsum[:n] = xsum.ravel()
localsum[n] = x.shape[axis]
# globalsum = np.zeros_like(localsum)
# comm.Allreduce(localsum, globalsum, op=MPI.SUM)
globalsum = comm.allreduce(localsum, op=MPI.SUM)
globalsum = np.zeros_like(localsum)
comm.Allreduce(localsum, globalsum, op=MPI.SUM)
return globalsum[:n].reshape(xsum.shape) / globalsum[n], globalsum[n]
def mpi_moments(x, axis=0, comm=None, keepdims=False):

View File

@@ -90,8 +90,6 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_
sum_y *= interstep_decay
count_y *= interstep_decay
while True:
if luoi >= len(xolds):
break
xold = xolds[luoi]
if xold <= xnew:
decay = np.exp(- (xnew - xold) / decay_period)
@@ -100,6 +98,8 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_
luoi += 1
else:
break
if luoi >= len(xolds):
break
sum_ys[i] = sum_y
count_ys[i] = count_y
@@ -248,10 +248,7 @@ def plot_results(
figsize=None,
legend_outside=False,
resample=0,
smooth_step=1.0,
tiling='vertical',
xlabel=None,
ylabel=None
smooth_step=1.0
):
'''
Plot multiple Results objects
@@ -303,23 +300,9 @@ def plot_results(
sk2r[splitkey].append(result)
assert len(sk2r) > 0
assert isinstance(resample, int), "0: don't resample. <integer>: that many samples"
if tiling == 'vertical' or tiling is None:
nrows = len(sk2r)
ncols = 1
elif tiling == 'horizontal':
ncols = len(sk2r)
nrows = 1
elif tiling == 'symmetric':
import math
N = len(sk2r)
largest_divisor = 1
for i in range(1, int(math.sqrt(N))+1):
if N % i == 0:
largest_divisor = i
ncols = largest_divisor
nrows = N // ncols
figsize = figsize or (6 * ncols, 6 * nrows)
nrows = len(sk2r)
ncols = 1
figsize = figsize or (6, 6 * nrows)
f, axarr = plt.subplots(nrows, ncols, sharex=False, squeeze=False, figsize=figsize)
groups = list(set(group_fn(result) for result in allresults))
@@ -333,9 +316,7 @@ def plot_results(
g2c = defaultdict(int)
sresults = sk2r[sk]
gresults = defaultdict(list)
idx_row = isplit // ncols
idx_col = isplit % ncols
ax = axarr[idx_row][idx_col]
ax = axarr[isplit][0]
for result in sresults:
group = group_fn(result)
g2c[group] += 1
@@ -374,7 +355,7 @@ def plot_results(
ymean = np.mean(ys, axis=0)
ystd = np.std(ys, axis=0)
ystderr = ystd / np.sqrt(len(ys))
l, = axarr[idx_row][idx_col].plot(usex, ymean, color=color)
l, = axarr[isplit][0].plot(usex, ymean, color=color)
g2l[group] = l
if shaded_err:
ax.fill_between(usex, ymean - ystderr, ymean + ystderr, color=color, alpha=.4)
@@ -391,17 +372,6 @@ def plot_results(
loc=2 if legend_outside else None,
bbox_to_anchor=(1,1) if legend_outside else None)
ax.set_title(sk)
# add xlabels, but only to the bottom row
if xlabel is not None:
for ax in axarr[-1]:
plt.sca(ax)
plt.xlabel(xlabel)
# add ylabels, but only to left column
if ylabel is not None:
for ax in axarr[:,0]:
plt.sca(ax)
plt.ylabel(ylabel)
return f, axarr
def regression_analysis(df):

View File

@@ -1,13 +1,10 @@
from baselines.common import mpi_util
from baselines import logger
from baselines.common.tests.test_with_mpi import with_mpi
try:
from mpi4py import MPI
except ImportError:
MPI = None
from baselines.common import mpi_util
@with_mpi()
def test_mpi_weighted_mean():
from mpi4py import MPI
comm = MPI.COMM_WORLD
with logger.scoped_configure(comm=comm):
if comm.rank == 0:
@@ -16,6 +13,7 @@ def test_mpi_weighted_mean():
name2valcount = {'a' : (19, 1), 'c' : (42,3)}
else:
raise NotImplementedError
d = mpi_util.mpi_weighted_mean(comm, name2valcount)
correctval = {'a' : (10 * 2 + 19) / 3.0, 'b' : 20, 'c' : 42}
if comm.rank == 0:

View File

@@ -1,2 +0,0 @@
import os, pytest
mark_slow = pytest.mark.skipif(not os.getenv('RUNSLOW'), reason='slow')

View File

@@ -9,16 +9,18 @@ class FixedSequenceEnv(Env):
n_actions=10,
episode_len=100
):
self.np_random = np.random.RandomState()
self.sequence = None
self.action_space = Discrete(n_actions)
self.observation_space = Discrete(1)
self.np_random = np.random.RandomState(0)
self.episode_len = episode_len
self.sequence = [self.np_random.randint(0, self.action_space.n)
for _ in range(self.episode_len)]
self.time = 0
def reset(self):
if self.sequence is None:
self.sequence = [self.np_random.randint(0, self.action_space.n-1) for _ in range(self.episode_len)]
self.time = 0
return 0
@@ -27,6 +29,7 @@ class FixedSequenceEnv(Env):
self._choose_next_state()
done = False
if self.episode_len and self.time >= self.episode_len:
rew = 0
done = True
return 0, rew, done, {}

View File

@@ -2,45 +2,43 @@ import numpy as np
from abc import abstractmethod
from gym import Env
from gym.spaces import MultiDiscrete, Discrete, Box
from collections import deque
class IdentityEnv(Env):
def __init__(
self,
episode_len=None,
delay=0,
zero_first_rewards=True
episode_len=None
):
self.observation_space = self.action_space
self.episode_len = episode_len
self.time = 0
self.delay = delay
self.zero_first_rewards = zero_first_rewards
self.q = deque(maxlen=delay+1)
self.reset()
def reset(self):
self.q.clear()
for _ in range(self.delay + 1):
self.q.append(self.action_space.sample())
self._choose_next_state()
self.time = 0
return self.q[-1]
return self.state
def step(self, actions):
rew = self._get_reward(self.q.popleft(), actions)
if self.zero_first_rewards and self.time < self.delay:
rew = 0
self.q.append(self.action_space.sample())
self.time += 1
done = self.episode_len is not None and self.time >= self.episode_len
return self.q[-1], rew, done, {}
rew = self._get_reward(actions)
self._choose_next_state()
done = False
if self.episode_len and self.time >= self.episode_len:
done = True
return self.state, rew, done, {}
def seed(self, seed=None):
self.action_space.seed(seed)
def _choose_next_state(self):
self.state = self.action_space.sample()
self.time += 1
@abstractmethod
def _get_reward(self, state, actions):
def _get_reward(self, actions):
raise NotImplementedError
@@ -49,29 +47,26 @@ class DiscreteIdentityEnv(IdentityEnv):
self,
dim,
episode_len=None,
delay=0,
zero_first_rewards=True
):
self.action_space = Discrete(dim)
super().__init__(episode_len=episode_len, delay=delay, zero_first_rewards=zero_first_rewards)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
return 1 if state == actions else 0
def _get_reward(self, actions):
return 1 if self.state == actions else 0
class MultiDiscreteIdentityEnv(IdentityEnv):
def __init__(
self,
dims,
episode_len=None,
delay=0,
):
self.action_space = MultiDiscrete(dims)
super().__init__(episode_len=episode_len, delay=delay)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
return 1 if all(state == actions) else 0
def _get_reward(self, actions):
return 1 if all(self.state == actions) else 0
class BoxIdentityEnv(IdentityEnv):
@@ -84,7 +79,7 @@ class BoxIdentityEnv(IdentityEnv):
self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
diff = actions - state
def _get_reward(self, actions):
diff = actions - self.state
diff = diff[:]
return -0.5 * np.dot(diff, diff)

View File

@@ -1,36 +0,0 @@
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv
def test_discrete_nodelay():
nsteps = 100
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen)
ob = env.reset()
for t in range(nsteps):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
assert rew == (1 if action == ob else 0)
if (t + 1) % eplen == 0:
assert done
next_ob = env.reset()
else:
assert not done
ob = next_ob
def test_discrete_delay1():
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen, delay=1)
ob = env.reset()
prev_ob = None
for t in range(eplen):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
if t > 0:
assert rew == (1 if action == prev_ob else 0)
else:
assert rew == 0
prev_ob = ob
ob = next_ob
if t < eplen - 1:
assert not done
assert done

View File

@@ -3,7 +3,6 @@ import gym
from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
common_kwargs = dict(
total_timesteps=30000,
@@ -21,7 +20,7 @@ learn_kwargs = {
'trpo_mpi': {}
}
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_cartpole(alg):
'''

View File

@@ -3,7 +3,6 @@ import gym
from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
pytest.importorskip('mujoco_py')
@@ -16,7 +15,7 @@ learn_kwargs = {
'her': dict(total_timesteps=2000)
}
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_fetchreach(alg):
'''

View File

@@ -3,8 +3,6 @@ from baselines.common.tests.envs.fixed_sequence_env import FixedSequenceEnv
from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
common_kwargs = dict(
seed=0,
@@ -19,11 +17,11 @@ learn_kwargs = {
# 'trpo_mpi': lambda e, p: trpo_mpi.learn(policy_fn=p(env=e), env=e, max_timesteps=30000, timesteps_per_batch=100, cg_iters=10, gamma=0.9, lam=1.0, max_kl=0.001)
}
alg_list = learn_kwargs.keys()
rnn_list = ['lstm']
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", alg_list)
@pytest.mark.parametrize("rnn", rnn_list)
def test_fixed_sequence(alg, rnn):
@@ -35,6 +33,9 @@ def test_fixed_sequence(alg, rnn):
kwargs = learn_kwargs[alg]
kwargs.update(common_kwargs)
if alg == 'ppo2' and rnn.endswith('lstm'):
rnn = 'ppo_' + rnn
env_fn = lambda: FixedSequenceEnv(n_actions=10, episode_len=5)
learn = lambda e: get_learn_function(alg)(
env=e,
@@ -47,6 +48,3 @@ def test_fixed_sequence(alg, rnn):
if __name__ == '__main__':
test_fixed_sequence('ppo2', 'lstm')

View File

@@ -2,7 +2,6 @@ import pytest
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv, MultiDiscreteIdentityEnv
from baselines.run import get_learn_function
from baselines.common.tests.util import simple_test
from baselines.common.tests import mark_slow
common_kwargs = dict(
total_timesteps=30000,
@@ -25,7 +24,7 @@ algos_disc = ['a2c', 'acktr', 'deepq', 'ppo2', 'trpo_mpi']
algos_multidisc = ['a2c', 'acktr', 'ppo2', 'trpo_mpi']
algos_cont = ['a2c', 'acktr', 'ddpg', 'ppo2', 'trpo_mpi']
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_disc)
def test_discrete_identity(alg):
'''
@@ -40,7 +39,7 @@ def test_discrete_identity(alg):
env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100)
simple_test(env_fn, learn_fn, 0.9)
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_multidisc)
def test_multidiscrete_identity(alg):
'''
@@ -55,7 +54,7 @@ def test_multidiscrete_identity(alg):
env_fn = lambda: MultiDiscreteIdentityEnv((3,3), episode_len=100)
simple_test(env_fn, learn_fn, 0.9)
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_cont)
def test_continuous_identity(alg):
'''

View File

@@ -4,7 +4,7 @@ import pytest
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
# TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem?
# GitHub issue https://github.com/openai/baselines/issues/189
@@ -28,7 +28,7 @@ learn_args = {
#tests pass, but are too slow on travis. Same algorithms are covered
# by other tests with less compute-hungry nn's and by benchmarks
@pytest.mark.skip
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_args.keys())
def test_mnist(alg):
'''

View File

@@ -1,17 +0,0 @@
# smoke tests of plot_util
from baselines.common import plot_util as pu
from baselines.common.tests.util import smoketest
def test_plot_util():
nruns = 4
logdirs = [smoketest('--alg=ppo2 --env=CartPole-v0 --num_timesteps=10000') for _ in range(nruns)]
data = pu.load_results(logdirs)
assert len(data) == 4
_, axes = pu.plot_results(data[:1]); assert len(axes) == 1
_, axes = pu.plot_results(data, tiling='vertical'); assert axes.shape==(4,1)
_, axes = pu.plot_results(data, tiling='horizontal'); assert axes.shape==(1,4)
_, axes = pu.plot_results(data, tiling='symmetric'); assert axes.shape==(2,2)
_, axes = pu.plot_results(data, split_fn=lambda _: ''); assert len(axes) == 1

View File

@@ -1,17 +1,16 @@
import os
import gym
import tempfile
import pytest
import tensorflow as tf
import numpy as np
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.run import get_learn_function
from baselines.common.tf_util import make_session, get_session
from functools import partial
import gym
import numpy as np
import pytest
import tensorflow as tf
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.tf_util import make_session, get_session
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.run import get_learn_function
learn_kwargs = {
'deepq': {},
@@ -37,12 +36,15 @@ def test_serialization(learn_fn, network_fn):
Test if the trained model can be serialized
'''
_network_kwargs = network_kwargs[network_fn]
if network_fn.endswith('lstm') and learn_fn in ['acer', 'acktr', 'trpo_mpi', 'deepq']:
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
elif network_fn.endswith('lstm') and learn_fn == 'ppo2':
network_fn = 'ppo_' + network_fn
def make_env():
env = MnistEnv(episode_len=100)
@@ -54,10 +56,9 @@ def test_serialization(learn_fn, network_fn):
learn = get_learn_function(learn_fn)
kwargs = {}
kwargs.update(network_kwargs[network_fn])
kwargs.update(_network_kwargs)
kwargs.update(learn_kwargs[learn_fn])
learn = partial(learn, env=env, network=network_fn, seed=0, **kwargs)
with tempfile.TemporaryDirectory() as td:
@@ -76,7 +77,7 @@ def test_serialization(learn_fn, network_fn):
for k, v in variables_dict1.items():
np.testing.assert_allclose(v, variables_dict2[k], atol=0.01,
err_msg='saved and loaded variable {} value mismatch'.format(k))
err_msg='saved and loaded variable {} value mismatch'.format(k))
np.testing.assert_allclose(mean1, mean2, atol=0.5)
np.testing.assert_allclose(std1, std2, atol=0.5)
@@ -90,15 +91,15 @@ def test_coexistence(learn_fn, network_fn):
'''
if learn_fn == 'deepq':
# TODO enable multiple DQN models to be useable at the same time
# github issue https://github.com/openai/baselines/issues/656
return
# TODO enable multiple DQN models to be useable at the same time
# github issue https://github.com/openai/baselines/issues/656
return
if network_fn.endswith('lstm') and learn_fn in ['acktr', 'trpo_mpi', 'deepq']:
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
env = DummyVecEnv([lambda: gym.make('CartPole-v0')])
learn = get_learn_function(learn_fn)
@@ -107,7 +108,7 @@ def test_coexistence(learn_fn, network_fn):
kwargs.update(network_kwargs[network_fn])
kwargs.update(learn_kwargs[learn_fn])
learn = partial(learn, env=env, network=network_fn, total_timesteps=0, **kwargs)
learn = partial(learn, env=env, network=network_fn, total_timesteps=0, **kwargs)
make_session(make_default=True, graph=tf.Graph())
model1 = learn(seed=1)
make_session(make_default=True, graph=tf.Graph())
@@ -117,7 +118,6 @@ def test_coexistence(learn_fn, network_fn):
model2.step(env.observation_space.sample())
def _serialize_variables():
sess = get_session()
variables = tf.trainable_variables()
@@ -136,4 +136,3 @@ def _get_action_stats(model, ob):
std = np.std(actions, axis=0)
return mean, std

View File

@@ -4,7 +4,6 @@ import subprocess
import cloudpickle
import base64
import pytest
from functools import wraps
try:
from mpi4py import MPI
@@ -13,7 +12,6 @@ except ImportError:
def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True):
def outer_thunk(fn):
@wraps(fn)
def thunk(*args, **kwargs):
serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs)))
subprocess.check_call([

View File

@@ -5,12 +5,6 @@ from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
N_TRIALS = 10000
N_EPISODES = 100
_sess_config = tf.ConfigProto(
allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1
)
def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS):
def seeded_env_fn():
env = env_fn()
@@ -19,7 +13,7 @@ def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS):
np.random.seed(0)
env = DummyVecEnv([seeded_env_fn])
with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default():
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
tf.set_random_seed(0)
model = learn_fn(env)
sum_rew = 0
@@ -40,7 +34,7 @@ def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS):
def reward_per_episode_test(env_fn, learn_fn, min_avg_reward, n_trials=N_EPISODES):
env = DummyVecEnv([env_fn])
with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default():
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
model = learn_fn(env)
N_TRIALS = 100
observations, actions, rewards = rollout(env, model, N_TRIALS)
@@ -77,16 +71,3 @@ def rollout(env, model, n_trials):
observations.append(episode_obs)
return observations, actions, rewards
def smoketest(argstr, **kwargs):
import tempfile
import subprocess
import os
argstr = 'python -m baselines.run ' + argstr
for key, value in kwargs:
argstr += ' --{}={}'.format(key, value)
tempdir = tempfile.mkdtemp()
env = os.environ.copy()
env['OPENAI_LOGDIR'] = tempdir
subprocess.run(argstr.split(' '), env=env)
return tempdir

View File

@@ -70,11 +70,9 @@ class ShmemVecEnv(VecEnv):
assert len(actions) == len(self.parent_pipes)
for pipe, act in zip(self.parent_pipes, actions):
pipe.send(('step', act))
self.waiting_step = True
def step_wait(self):
outs = [pipe.recv() for pipe in self.parent_pipes]
self.waiting_step = False
obs, rews, dones, infos = zip(*outs)
return self._decode_obses(obs), np.array(rews), np.array(dones), infos

View File

@@ -4,36 +4,33 @@ import numpy as np
from .vec_env import VecEnv, CloudpickleWrapper, clear_mpi_env_vars
def worker(remote, parent_remote, env_fn_wrappers):
def step_env(env, action):
ob, reward, done, info = env.step(action)
if done:
ob = env.reset()
return ob, reward, done, info
def worker(remote, parent_remote, env_fn_wrapper):
parent_remote.close()
envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
env = env_fn_wrapper.x()
try:
while True:
cmd, data = remote.recv()
if cmd == 'step':
remote.send([step_env(env, action) for env, action in zip(envs, data)])
ob, reward, done, info = env.step(data)
if done:
ob = env.reset()
remote.send((ob, reward, done, info))
elif cmd == 'reset':
remote.send([env.reset() for env in envs])
ob = env.reset()
remote.send(ob)
elif cmd == 'render':
remote.send([env.render(mode='rgb_array') for env in envs])
remote.send(env.render(mode='rgb_array'))
elif cmd == 'close':
remote.close()
break
elif cmd == 'get_spaces_spec':
remote.send(CloudpickleWrapper((envs[0].observation_space, envs[0].action_space, envs[0].spec)))
remote.send((env.observation_space, env.action_space, env.spec))
else:
raise NotImplementedError
except KeyboardInterrupt:
print('SubprocVecEnv worker: got KeyboardInterrupt')
finally:
for env in envs:
env.close()
env.close()
class SubprocVecEnv(VecEnv):
@@ -41,23 +38,17 @@ class SubprocVecEnv(VecEnv):
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
Recommended to use when num_envs > 1 and step() can be a bottleneck.
"""
def __init__(self, env_fns, spaces=None, context='spawn', in_series=1):
def __init__(self, env_fns, spaces=None, context='spawn'):
"""
Arguments:
env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable
in_series: number of environments to run in series in a single process
(e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)
"""
self.waiting = False
self.closed = False
self.in_series = in_series
nenvs = len(env_fns)
assert nenvs % in_series == 0, "Number of envs must be divisible by number of envs to run in series"
self.nremotes = nenvs // in_series
env_fns = np.array_split(env_fns, self.nremotes)
ctx = mp.get_context(context)
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)])
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(nenvs)])
self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
for p in self.ps:
@@ -68,13 +59,12 @@ class SubprocVecEnv(VecEnv):
remote.close()
self.remotes[0].send(('get_spaces_spec', None))
observation_space, action_space, self.spec = self.remotes[0].recv().x
observation_space, action_space, self.spec = self.remotes[0].recv()
self.viewer = None
VecEnv.__init__(self, nenvs, observation_space, action_space)
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
def step_async(self, actions):
self._assert_not_closed()
actions = np.array_split(actions, self.nremotes)
for remote, action in zip(self.remotes, actions):
remote.send(('step', action))
self.waiting = True
@@ -82,7 +72,6 @@ class SubprocVecEnv(VecEnv):
def step_wait(self):
self._assert_not_closed()
results = [remote.recv() for remote in self.remotes]
results = _flatten_list(results)
self.waiting = False
obs, rews, dones, infos = zip(*results)
return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos
@@ -91,9 +80,7 @@ class SubprocVecEnv(VecEnv):
self._assert_not_closed()
for remote in self.remotes:
remote.send(('reset', None))
obs = [remote.recv() for remote in self.remotes]
obs = _flatten_list(obs)
return _flatten_obs(obs)
return _flatten_obs([remote.recv() for remote in self.remotes])
def close_extras(self):
self.closed = True
@@ -110,7 +97,6 @@ class SubprocVecEnv(VecEnv):
for pipe in self.remotes:
pipe.send(('render', None))
imgs = [pipe.recv() for pipe in self.remotes]
imgs = _flatten_list(imgs)
return imgs
def _assert_not_closed(self):
@@ -129,10 +115,3 @@ def _flatten_obs(obs):
return {k: np.stack([o[k] for o in obs]) for k in keys}
else:
return np.stack(obs)
def _flatten_list(l):
assert isinstance(l, (list, tuple))
assert len(l) > 0
assert all([len(l_) > 0 for l_ in l])
return [l__ for l_ in l for l__ in l_]

View File

@@ -67,50 +67,6 @@ def test_vec_env(klass, dtype): # pylint: disable=R0914
assert_venvs_equal(env1, env2, num_steps=num_steps)
@pytest.mark.parametrize('dtype', ('uint8', 'float32'))
@pytest.mark.parametrize('num_envs_in_series', (3, 4, 6))
def test_sync_sampling(dtype, num_envs_in_series):
"""
Test that a SubprocVecEnv running with envs in series
outputs the same as DummyVecEnv.
"""
num_envs = 12
num_steps = 100
shape = (3, 8)
def make_fn(seed):
"""
Get an environment constructor with a seed.
"""
return lambda: SimpleEnv(seed, shape, dtype)
fns = [make_fn(i) for i in range(num_envs)]
env1 = DummyVecEnv(fns)
env2 = SubprocVecEnv(fns, in_series=num_envs_in_series)
assert_venvs_equal(env1, env2, num_steps=num_steps)
@pytest.mark.parametrize('dtype', ('uint8', 'float32'))
@pytest.mark.parametrize('num_envs_in_series', (3, 4, 6))
def test_sync_sampling_sanity(dtype, num_envs_in_series):
"""
Test that a SubprocVecEnv running with envs in series
outputs the same as SubprocVecEnv without running in series.
"""
num_envs = 12
num_steps = 100
shape = (3, 8)
def make_fn(seed):
"""
Get an environment constructor with a seed.
"""
return lambda: SimpleEnv(seed, shape, dtype)
fns = [make_fn(i) for i in range(num_envs)]
env1 = SubprocVecEnv(fns)
env2 = SubprocVecEnv(fns, in_series=num_envs_in_series)
assert_venvs_equal(env1, env2, num_steps=num_steps)
class SimpleEnv(gym.Env):
"""
An environment with a pre-determined observation space

View File

@@ -38,9 +38,6 @@ def obs_space_info(obs_space):
if isinstance(obs_space, gym.spaces.Dict):
assert isinstance(obs_space.spaces, OrderedDict)
subspaces = obs_space.spaces
elif isinstance(obs_space, gym.spaces.Tuple):
assert isinstance(obs_space.spaces, tuple)
subspaces = {i: obs_space.spaces[i] for i in range(len(obs_space.spaces))}
else:
subspaces = {None: obs_space}
keys = []

View File

@@ -145,7 +145,8 @@ class VecEnvWrapper(VecEnv):
def __init__(self, venv, observation_space=None, action_space=None):
self.venv = venv
super().__init__(num_envs=venv.num_envs,
VecEnv.__init__(self,
num_envs=venv.num_envs,
observation_space=observation_space or venv.observation_space,
action_space=action_space or venv.action_space)
@@ -169,11 +170,6 @@ class VecEnvWrapper(VecEnv):
def get_images(self):
return self.venv.get_images()
def __getattr__(self, name):
if name.startswith('_'):
raise AttributeError("attempted to get missing private attribute '{}'".format(name))
return getattr(self.venv, name)
class VecEnvObservationWrapper(VecEnvWrapper):
@abstractmethod
def process(self, obs):

View File

@@ -5,18 +5,16 @@ import time
from collections import deque
class VecMonitor(VecEnvWrapper):
def __init__(self, venv, filename=None, keep_buf=0, info_keywords=()):
def __init__(self, venv, filename=None, keep_buf=0):
VecEnvWrapper.__init__(self, venv)
self.eprets = None
self.eplens = None
self.epcount = 0
self.tstart = time.time()
if filename:
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart},
extra_keys=info_keywords)
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart})
else:
self.results_writer = None
self.info_keywords = info_keywords
self.keep_buf = keep_buf
if self.keep_buf:
self.epret_buf = deque([], maxlen=keep_buf)
@@ -32,16 +30,11 @@ class VecMonitor(VecEnvWrapper):
obs, rews, dones, infos = self.venv.step_wait()
self.eprets += rews
self.eplens += 1
newinfos = list(infos[:])
for i in range(len(dones)):
if dones[i]:
info = infos[i].copy()
ret = self.eprets[i]
eplen = self.eplens[i]
newinfos = []
for (i, (done, ret, eplen, info)) in enumerate(zip(dones, self.eprets, self.eplens, infos)):
info = info.copy()
if done:
epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)}
for k in self.info_keywords:
epinfo[k] = info[k]
info['episode'] = epinfo
if self.keep_buf:
self.epret_buf.append(ret)
@@ -51,5 +44,6 @@ class VecMonitor(VecEnvWrapper):
self.eplens[i] = 0
if self.results_writer:
self.results_writer.write_row(epinfo)
newinfos[i] = info
newinfos.append(info)
return obs, rews, dones, newinfos

View File

@@ -1,22 +1,18 @@
from . import VecEnvWrapper
from baselines.common.running_mean_std import RunningMeanStd
import numpy as np
class VecNormalize(VecEnvWrapper):
"""
A vectorized wrapper that normalizes the observations
and returns from an environment.
"""
def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8, use_tf=False):
def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8):
VecEnvWrapper.__init__(self, venv)
if use_tf:
from baselines.common.running_mean_std import TfRunningMeanStd
self.ob_rms = TfRunningMeanStd(shape=self.observation_space.shape, scope='ob_rms') if ob else None
self.ret_rms = TfRunningMeanStd(shape=(), scope='ret_rms') if ret else None
else:
from baselines.common.running_mean_std import RunningMeanStd
self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None
self.ret_rms = RunningMeanStd(shape=()) if ret else None
self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None
self.ret_rms = RunningMeanStd(shape=()) if ret else None
self.clipob = clipob
self.cliprew = cliprew
self.ret = np.zeros(self.num_envs)

View File

@@ -1,5 +1,6 @@
from .vec_env import VecEnvObservationWrapper
class VecExtractDictObs(VecEnvObservationWrapper):
def __init__(self, venv, key):
self.key = key
@@ -7,4 +8,4 @@ class VecExtractDictObs(VecEnvObservationWrapper):
observation_space=venv.observation_space.spaces[self.key])
def process(self, obs):
return obs[self.key]
return obs[self.key]

View File

@@ -16,14 +16,4 @@ class TimeLimit(gym.Wrapper):
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
class ClipActionsWrapper(gym.Wrapper):
def step(self, action):
import numpy as np
action = np.nan_to_num(action)
action = np.clip(action, self.action_space.low, self.action_space.high)
return self.env.step(action)
def reset(self, **kwargs):
return self.env.reset(**kwargs)
return self.env.reset(**kwargs)

View File

@@ -217,9 +217,7 @@ def learn(network, env,
stats = agent.get_stats()
combined_stats = stats.copy()
combined_stats['rollout/return'] = np.mean(epoch_episode_rewards)
combined_stats['rollout/return_std'] = np.std(epoch_episode_rewards)
combined_stats['rollout/return_history'] = np.mean(episode_rewards_history)
combined_stats['rollout/return_history_std'] = np.std(episode_rewards_history)
combined_stats['rollout/episode_steps'] = np.mean(epoch_episode_steps)
combined_stats['rollout/actions_mean'] = np.mean(epoch_actions)
combined_stats['rollout/Q_mean'] = np.mean(epoch_qs)

View File

@@ -378,6 +378,11 @@ class DDPG(object):
self.param_noise_stddev: self.param_noise.current_stddev,
})
if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:

View File

@@ -1,6 +1,10 @@
from baselines.common.tests.util import smoketest
from multiprocessing import Process
import baselines.run
def _run(argstr):
smoketest('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr)
p = Process(target=baselines.run.main, args=('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' '))
p.start()
p.join()
def test_popart():
_run('--normalize_returns=True --popart=True')

View File

@@ -13,7 +13,7 @@ The functions in this file can are used to create the following functions:
stochastic: bool
if set to False all the actions are always deterministic (default False)
update_eps_ph: float
update epsilon a new value, if negative no update happens
update epsilon a new value, if negative not update happens
(default: no update)
Returns

View File

@@ -142,8 +142,9 @@ def learn(env,
final value of random action probability
train_freq: int
update the model every `train_freq` steps.
set to None to disable printing
batch_size: int
size of a batch sampled from replay buffer for training
size of a batched sampled from replay buffer for training
print_freq: int
how often to print out training progress
set to None to disable printing

View File

@@ -2,6 +2,101 @@ import tensorflow as tf
import tensorflow.contrib.layers as layers
def _mlp(hiddens, input_, num_actions, scope, reuse=False, layer_norm=False):
with tf.variable_scope(scope, reuse=reuse):
out = input_
for hidden in hiddens:
out = layers.fully_connected(out, num_outputs=hidden, activation_fn=None)
if layer_norm:
out = layers.layer_norm(out, center=True, scale=True)
out = tf.nn.relu(out)
q_out = layers.fully_connected(out, num_outputs=num_actions, activation_fn=None)
return q_out
def mlp(hiddens=[], layer_norm=False):
"""This model takes as input an observation and returns values of all actions.
Parameters
----------
hiddens: [int]
list of sizes of hidden layers
layer_norm: bool
if true applies layer normalization for every layer
as described in https://arxiv.org/abs/1607.06450
Returns
-------
q_func: function
q_function for DQN algorithm.
"""
return lambda *args, **kwargs: _mlp(hiddens, layer_norm=layer_norm, *args, **kwargs)
def _cnn_to_mlp(convs, hiddens, dueling, input_, num_actions, scope, reuse=False, layer_norm=False):
with tf.variable_scope(scope, reuse=reuse):
out = input_
with tf.variable_scope("convnet"):
for num_outputs, kernel_size, stride in convs:
out = layers.convolution2d(out,
num_outputs=num_outputs,
kernel_size=kernel_size,
stride=stride,
activation_fn=tf.nn.relu)
conv_out = layers.flatten(out)
with tf.variable_scope("action_value"):
action_out = conv_out
for hidden in hiddens:
action_out = layers.fully_connected(action_out, num_outputs=hidden, activation_fn=None)
if layer_norm:
action_out = layers.layer_norm(action_out, center=True, scale=True)
action_out = tf.nn.relu(action_out)
action_scores = layers.fully_connected(action_out, num_outputs=num_actions, activation_fn=None)
if dueling:
with tf.variable_scope("state_value"):
state_out = conv_out
for hidden in hiddens:
state_out = layers.fully_connected(state_out, num_outputs=hidden, activation_fn=None)
if layer_norm:
state_out = layers.layer_norm(state_out, center=True, scale=True)
state_out = tf.nn.relu(state_out)
state_score = layers.fully_connected(state_out, num_outputs=1, activation_fn=None)
action_scores_mean = tf.reduce_mean(action_scores, 1)
action_scores_centered = action_scores - tf.expand_dims(action_scores_mean, 1)
q_out = state_score + action_scores_centered
else:
q_out = action_scores
return q_out
def cnn_to_mlp(convs, hiddens, dueling=False, layer_norm=False):
"""This model takes as input an observation and returns values of all actions.
Parameters
----------
convs: [(int, int, int)]
list of convolutional layers in form of
(num_outputs, kernel_size, stride)
hiddens: [int]
list of sizes of hidden layers
dueling: bool
if true double the output MLP to compute a baseline
for action scores
layer_norm: bool
if true applies layer normalization for every layer
as described in https://arxiv.org/abs/1607.06450
Returns
-------
q_func: function
q_function for DQN algorithm.
"""
return lambda *args, **kwargs: _cnn_to_mlp(convs, hiddens, dueling, layer_norm=layer_norm, *args, **kwargs)
def build_q_func(network, hiddens=[256], dueling=True, layer_norm=False, **network_kwargs):
if isinstance(network, str):
from baselines.common.models import get_network_builder

View File

@@ -23,7 +23,7 @@ from baselines.gail.dataset.mujoco_dset import Mujoco_Dset
def argsparser():
parser = argparse.ArgumentParser("Tensorflow Implementation of Behavior Cloning")
parser.add_argument('--env_id', help='environment ID', default='Hopper-v2')
parser.add_argument('--env_id', help='environment ID', default='Hopper-v1')
parser.add_argument('--seed', help='RNG seed', type=int, default=0)
parser.add_argument('--expert_path', type=str, default='data/deterministic.trpo.Hopper.0.00.npz')
parser.add_argument('--checkpoint_dir', help='the directory to save model', default='checkpoint')
@@ -73,7 +73,7 @@ def learn(env, policy_func, dataset, optim_batch_size=128, max_iters=1e4,
savedir_fname = tempfile.TemporaryDirectory().name
else:
savedir_fname = osp.join(ckpt_dir, task_name)
U.save_variables(savedir_fname, variables=pi.get_variables())
U.save_state(savedir_fname, var_list=pi.get_variables())
return savedir_fname

View File

@@ -77,7 +77,7 @@ class Mujoco_Dset(object):
self.log_info()
def log_info(self):
logger.log("Total trajectories: %d" % self.num_traj)
logger.log("Total trajectorues: %d" % self.num_traj)
logger.log("Total transitions: %d" % self.num_transition)
logger.log("Average returns: %f" % self.avg_ret)
logger.log("Std for returns: %f" % self.std_ret)

View File

@@ -165,7 +165,7 @@ def runner(env, policy_func, load_model_path, timesteps_per_batch, number_trajs,
U.initialize()
# Prepare for rollouts
# ----------------------------------------
U.load_variables(load_model_path)
U.load_state(load_model_path)
obs_list = []
acs_list = []

View File

@@ -15,7 +15,8 @@ class RolloutWorker:
"""Rollout worker generates experience by interacting with one or many environments.
Args:
venv: vectorized gym environments.
make_env (function): a factory function that creates a new instance of the environment
when called
policy (object): the policy that is used to act
dims (dict of ints): the dimensions for observations (o), goals (g), and actions (u)
logger (object): the logger that is used by the rollout worker

View File

@@ -38,8 +38,8 @@ class HumanOutputFormat(KVWriter, SeqWriter):
# Create strings for printing
key2str = {}
for (key, val) in sorted(kvs.items()):
if hasattr(val, '__float__'):
valstr = '%-8.3g' % val
if isinstance(val, float):
valstr = '%-8.3g' % (val,)
else:
valstr = str(val)
key2str[self._truncate(key)] = self._truncate(valstr)
@@ -92,6 +92,7 @@ class JSONOutputFormat(KVWriter):
def writekvs(self, kvs):
for k, v in sorted(kvs.items()):
if hasattr(v, 'dtype'):
v = v.tolist()
kvs[k] = float(v)
self.file.write(json.dumps(kvs) + '\n')
self.file.flush()
@@ -360,16 +361,7 @@ class Logger(object):
if isinstance(fmt, SeqWriter):
fmt.writeseq(map(str, args))
def get_rank_without_mpi_import():
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
return int(os.environ[varname])
return 0
def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
def configure(dir=None, format_strs=None, comm=None):
"""
If comm is provided, average all numerical stats across that comm
"""
@@ -379,12 +371,17 @@ def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
dir = osp.join(tempfile.gettempdir(),
datetime.datetime.now().strftime("openai-%Y-%m-%d-%H-%M-%S-%f"))
assert isinstance(dir, str)
dir = os.path.expanduser(dir)
os.makedirs(os.path.expanduser(dir), exist_ok=True)
os.makedirs(dir, exist_ok=True)
rank = get_rank_without_mpi_import()
log_suffix = ''
rank = 0
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
rank = int(os.environ[varname])
if rank > 0:
log_suffix = log_suffix + "-rank%03i" % rank
log_suffix = "-rank%03i" % rank
if format_strs is None:
if rank == 0:
@@ -395,8 +392,7 @@ def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
output_formats = [make_output_format(f, dir, log_suffix) for f in format_strs]
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats, comm=comm)
if output_formats:
log('Logging to %s'%dir)
log('Logging to %s'%dir)
def _configure_default_logger():
configure()

View File

@@ -3,6 +3,16 @@
- Original paper: https://arxiv.org/abs/1707.06347
- Baselines blog post: https://blog.openai.com/openai-baselines-ppo/
## Examples
- `python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4` runs the algorithm for 40M frames = 10M timesteps on an Atari Pong. See help (`-h`) for more options.
- `python -m baselines.run --alg=ppo2 --env=Ant-v2 --num_timesteps=1e6` runs the algorithm for 1M frames on a Mujoco Ant environment.
- also refer to the repo-wide [README.md](../../README.md#training-models)
### RNN networks
- `python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4 --network=ppo_cnn_lstm` runs on an Atari Pong with
`ppo_cnn_lstm` network.
- `python -m baselines.run --alg=ppo2 --env=Ant-v2 --num_timesteps=1e6 --network=ppo_lstm --value_network=copy`
runs on a Mujoco Ant environment with `ppo_lstm` network whose value and policy networks are separated, but have
same structure.
## See Also
- refer to the repo-wide [README.md](../../README.md#training-models)

View File

@@ -0,0 +1 @@
from baselines.ppo2.layers import ppo_lstm, ppo_cnn_lstm, ppo_cnn_lnlstm # pylint: disable=unused-import # noqa: F401

55
baselines/ppo2/layers.py Normal file
View File

@@ -0,0 +1,55 @@
import numpy as np
import tensorflow as tf
from baselines.a2c.utils import ortho_init, lstm, lnlstm
from baselines.common.models import register, nature_cnn
class RNN(object):
def __init__(self, func, memory_size=None):
self._func = func
self.memory_size = memory_size
def __call__(self, *args, **kwargs):
return self._func(*args, **kwargs)
@register("ppo_lstm")
def ppo_lstm(num_units=128, layer_norm=False):
def network_fn(input, mask, state):
input = tf.layers.flatten(input)
mask = tf.to_float(mask)
if layer_norm:
h, next_state = lnlstm([input], [mask[:, None]], state, scope='lnlstm', nh=num_units)
else:
h, next_state = lstm([input], [mask[:, None]], state, scope='lstm', nh=num_units)
h = h[0]
return h, next_state
return RNN(network_fn, memory_size=num_units * 2)
@register("ppo_cnn_lstm")
def ppo_cnn_lstm(num_units=128, layer_norm=False, **conv_kwargs):
def network_fn(input, mask, state):
mask = tf.to_float(mask)
initializer = ortho_init(np.sqrt(2))
h = nature_cnn(input, **conv_kwargs)
h = tf.layers.flatten(h)
h = tf.layers.dense(h, units=512, activation=tf.nn.relu, kernel_initializer=initializer)
if layer_norm:
h, next_state = lnlstm([h], [mask[:, None]], state, scope='lnlstm', nh=num_units)
else:
h, next_state = lstm([h], [mask[:, None]], state, scope='lstm', nh=num_units)
h = h[0]
return h, next_state
return RNN(network_fn, memory_size=num_units * 2)
@register("ppo_cnn_lnlstm")
def ppo_cnn_lnlstm(num_units=128, **conv_kwargs):
return ppo_cnn_lstm(num_units, layer_norm=True, **conv_kwargs)

View File

@@ -1,44 +1,47 @@
import tensorflow as tf
import numpy as np
import tensorflow as tf
from baselines.ppo2.model import Model
class MicrobatchedModel(Model):
"""
Model that does training one microbatch at a time - when gradient computation
on the entire minibatch causes some overflow
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight, comm, microbatch_size):
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size):
self.nmicrobatches = nbatch_train // microbatch_size
self.microbatch_size = microbatch_size
assert nbatch_train % microbatch_size == 0, 'microbatch_size ({}) should divide nbatch_train ({}) evenly'.format(microbatch_size, nbatch_train)
assert nbatch_train % microbatch_size == 0, 'microbatch_size ({}) should divide nbatch_train ({}) evenly'.format(
microbatch_size, nbatch_train)
super().__init__(
policy=policy,
ob_space=ob_space,
ac_space=ac_space,
nbatch_act=nbatch_act,
nbatch_train=microbatch_size,
nsteps=nsteps,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm,
mpi_rank_weight=mpi_rank_weight,
comm=comm)
policy=policy,
ob_space=ob_space,
ac_space=ac_space,
nbatch_act=nbatch_act,
nbatch_train=microbatch_size,
nsteps=nsteps,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm)
self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads]
grads_ph_and_vars = list(zip(self.grads_ph, self.var))
self._apply_gradients_op = self.trainer.apply_gradients(grads_ph_and_vars)
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
assert states is None, "microbatches with recurrent models are not supported yet"
# Here we calculate advantage A(s,a) = R + yV(s') - V(s)
# Returns = R + yV(s')
advs = returns - values
def train(self,
lr,
cliprange,
observations,
advs,
returns,
actions,
values,
neglogpacs,
**_kwargs):
# Normalize the advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
@@ -46,19 +49,24 @@ class MicrobatchedModel(Model):
stats_vs = []
for microbatch_idx in range(self.nmicrobatches):
_sli = range(microbatch_idx * self.microbatch_size, (microbatch_idx+1) * self.microbatch_size)
_sli = range(microbatch_idx * self.microbatch_size, (microbatch_idx + 1) * self.microbatch_size)
td_map = {
self.train_model.X: obs[_sli],
self.A:actions[_sli],
self.ADV:advs[_sli],
self.R:returns[_sli],
self.CLIPRANGE:cliprange,
self.OLDNEGLOGPAC:neglogpacs[_sli],
self.OLDVPRED:values[_sli]
self.train_model.X: observations[_sli],
self.A: actions[_sli],
self.ADV: advs[_sli],
self.RETURNS: returns[_sli],
self.LR: lr,
self.CLIPRANGE: cliprange,
self.OLDNEGLOGPAC: neglogpacs[_sli],
self.VALUE_PREV: values[_sli],
}
sliced_kwargs = {key: _kwargs[key][_sli] for key in _kwargs}
td_map.update(self.train_model.feed_dict(**sliced_kwargs))
# Compute gradient on a microbatch (note that variables do not change here) ...
grad_v, stats_v = self.sess.run([self.grads, self.stats_list], td_map)
grad_v, stats_v = self.sess.run([self.grads, self.stats_list], td_map)
if microbatch_idx == 0:
sum_grad_v = grad_v
else:
@@ -73,6 +81,3 @@ class MicrobatchedModel(Model):
self.sess.run(self._apply_gradients_op, feed_dict)
# Return average of the stats
return np.mean(np.array(stats_vs), axis=0).tolist()

View File

@@ -1,8 +1,8 @@
import tensorflow as tf
import functools
import tensorflow as tf
from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.tf_util import initialize
try:
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
@@ -11,6 +11,7 @@ try:
except ImportError:
MPI = None
class Model(object):
"""
We use this object to :
@@ -24,136 +25,157 @@ class Model(object):
save/load():
- Save load the model
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight=1, comm=None, microbatch_size=None):
self.sess = sess = get_session()
nsteps, ent_coef, vf_coef, max_grad_norm,
name='ppo_model',
sess=None,
microbatch_size=None):
if sess is None:
sess = get_session()
self.sess = sess
self.name = name
if MPI is not None and comm is None:
comm = MPI.COMM_WORLD
with tf.variable_scope(name) as scope:
self.scope = scope
with tf.variable_scope('models', reuse=tf.AUTO_REUSE):
with tf.name_scope('act_model'):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
act_model = policy(nbatch_act, 1, sess)
with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
act_model = policy(nbatch_act, 1, sess)
with tf.name_scope('train_model'):
# Train model for training
if microbatch_size is None:
train_model = policy(nbatch_train, nsteps, sess)
else:
train_model = policy(microbatch_size, nsteps, sess)
# Train model for training
if microbatch_size is None:
train_model = policy(nbatch_train, nsteps, sess)
else:
train_model = policy(microbatch_size, nsteps, sess)
with tf.variable_scope('losses'):
# CREATE THE PLACEHOLDERS
self.A = A = train_model.pdtype.sample_placeholder([None], name='action')
self.ADV = ADV = tf.placeholder(tf.float32, [None], name='advantage')
self.RETURNS = RETURNS = tf.placeholder(tf.float32, [None], name='reward')
self.VALUE_PREV = VALUE_PREV = tf.placeholder(tf.float32, [None], name='value_prev')
self.OLDNEGLOGPAC = OLDNEGLOGPAC = tf.placeholder(tf.float32, [None],
name='negative_log_p_action_old')
self.CLIPRANGE = CLIPRANGE = tf.placeholder(tf.float32, [], name='clip_range')
# CREATE THE PLACEHOLDERS
self.A = A = train_model.pdtype.sample_placeholder([None])
self.ADV = ADV = tf.placeholder(tf.float32, [None])
self.R = R = tf.placeholder(tf.float32, [None])
# Keep track of old actor
self.OLDNEGLOGPAC = OLDNEGLOGPAC = tf.placeholder(tf.float32, [None])
# Keep track of old critic
self.OLDVPRED = OLDVPRED = tf.placeholder(tf.float32, [None])
self.LR = LR = tf.placeholder(tf.float32, [])
# Cliprange
self.CLIPRANGE = CLIPRANGE = tf.placeholder(tf.float32, [])
with tf.name_scope("neglogpac"):
neglogpac = train_model.pd.neglogp(A)
neglogpac = train_model.pd.neglogp(A)
with tf.name_scope("entropy"):
# Calculate the entropy
# Entropy is used to improve exploration by limiting the premature convergence to suboptimal policy.
entropy = tf.reduce_mean(train_model.pd.entropy())
entropy_loss = (- ent_coef) * entropy
# Calculate the entropy
# Entropy is used to improve exploration by limiting the premature convergence to suboptimal policy.
entropy = tf.reduce_mean(train_model.pd.entropy())
with tf.name_scope("value_loss"):
# CALCULATE THE LOSS
value = train_model.value
value_clipped = VALUE_PREV + tf.clip_by_value(value - VALUE_PREV, -CLIPRANGE, CLIPRANGE)
vf_losses1 = tf.squared_difference(value, RETURNS)
vf_losses2 = tf.squared_difference(value_clipped, RETURNS)
vf_loss = 0.5 * vf_coef * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
# CALCULATE THE LOSS
# Total loss = Policy gradient loss - entropy * entropy coefficient + Value coefficient * value loss
with tf.name_scope("policy_loss"):
# Calculate ratio (pi current policy / pi old policy)
ratio = tf.exp(OLDNEGLOGPAC - neglogpac)
pg_losses = -ADV * ratio
pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE)
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
# Clip the value to reduce variability during Critic training
# Get the predicted value
vpred = train_model.vf
vpredclipped = OLDVPRED + tf.clip_by_value(train_model.vf - OLDVPRED, - CLIPRANGE, CLIPRANGE)
# Unclipped value
vf_losses1 = tf.square(vpred - R)
# Clipped value
vf_losses2 = tf.square(vpredclipped - R)
with tf.name_scope("approxkl"):
approxkl = .5 * tf.reduce_mean(tf.squared_difference(neglogpac, OLDNEGLOGPAC))
vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
with tf.name_scope("clip_fraction"):
clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE)))
# Calculate ratio (pi current policy / pi old policy)
ratio = tf.exp(OLDNEGLOGPAC - neglogpac)
with tf.name_scope("total_loss"):
loss = pg_loss + entropy_loss + vf_loss
# Defining Loss = - J is equivalent to max J
pg_losses = -ADV * ratio
with tf.variable_scope('optimizer'):
self.LR = LR = tf.placeholder(tf.float32, [], name='learning_rate')
pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE)
# UPDATE THE PARAMETERS USING LOSS
# 1. Get the model parameters
params = tf.trainable_variables(self.scope.name)
# Final PG loss
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
approxkl = .5 * tf.reduce_mean(tf.square(neglogpac - OLDNEGLOGPAC))
clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE)))
# 2. Build our trainer
if MPI is not None:
self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
grads_and_var = self.trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var)
# Total loss
loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef
if max_grad_norm is not None:
# Clip the gradients (normalize)
grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm)
grads_and_var = list(zip(grads, var))
# UPDATE THE PARAMETERS USING LOSS
# 1. Get the model parameters
params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer
if comm is not None and comm.Get_size() > 1:
self.trainer = MpiAdamOptimizer(comm, learning_rate=LR, mpi_rank_weight=mpi_rank_weight, epsilon=1e-5)
else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
grads_and_var = self.trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var)
self.grads = grads
self.var = var
self._train_op = self.trainer.apply_gradients(grads_and_var)
if max_grad_norm is not None:
# Clip the gradients (normalize)
grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm)
grads_and_var = list(zip(grads, var))
# zip aggregate each gradient with parameters associated
# For instance zip(ABCD, xyza) => Ax, By, Cz, Da
self.loss_names = ['policy_loss', 'value_loss', 'entropy_loss', 'approxkl', 'clipfrac',
'total_loss']
self.stats_list = [pg_loss, vf_loss, entropy_loss, approxkl, clipfrac, loss]
self.grads = grads
self.var = var
self._train_op = self.trainer.apply_gradients(grads_and_var)
self.loss_names = ['policy_loss', 'value_loss', 'policy_entropy', 'approxkl', 'clipfrac']
self.stats_list = [pg_loss, vf_loss, entropy, approxkl, clipfrac]
self.train_model = train_model
self.act_model = act_model
self.initial_state = act_model.initial_state
self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess)
self.train_model = train_model
self.act_model = act_model
self.step = act_model.step
self.value = act_model.value
self.initial_state = act_model.initial_state
with tf.variable_scope('initialization'):
sess.run(tf.initializers.variables(tf.global_variables(self.scope.name)))
sess.run(tf.initializers.variables(tf.local_variables(self.scope.name)))
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=self.scope.name)
if MPI is not None:
sync_from_root(sess, global_variables) # pylint: disable=E1101
self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess)
def step_with_dict(self, **kwargs):
return self.act_model.step(**kwargs)
initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
if MPI is not None:
sync_from_root(sess, global_variables, comm=comm) #pylint: disable=E1101
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
# Here we calculate advantage A(s,a) = R + yV(s') - V(s)
# Returns = R + yV(s')
advs = returns - values
def step(self, obs, M=None, S=None, **kwargs):
kwargs.update({'observations': obs})
if M is not None and S is not None:
kwargs.update({'dones': M})
kwargs.update({'states': S})
transition = self.act_model.step(**kwargs)
states = transition['next_states'] if 'next_states' in transition else None
return transition['actions'], transition['values'], states, transition['neglogpacs']
def train(self,
lr,
cliprange,
observations,
advs,
returns,
actions,
values,
neglogpacs,
**_kwargs):
# Normalize the advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
td_map = {
self.train_model.X : obs,
self.A : actions,
self.ADV : advs,
self.R : returns,
self.LR : lr,
self.CLIPRANGE : cliprange,
self.OLDNEGLOGPAC : neglogpacs,
self.OLDVPRED : values
self.train_model.X: observations,
self.A: actions,
self.ADV: advs,
self.RETURNS: returns,
self.LR: lr,
self.CLIPRANGE: cliprange,
self.OLDNEGLOGPAC: neglogpacs,
self.VALUE_PREV: values,
}
if states is not None:
td_map[self.train_model.S] = states
td_map[self.train_model.M] = masks
td_map.update(self.train_model.feed_dict(**_kwargs))
return self.sess.run(
self.stats_list + [self._train_op],
td_map
)[:-1]

188
baselines/ppo2/policies.py Normal file
View File

@@ -0,0 +1,188 @@
import gym
import numpy as np
import tensorflow as tf
from baselines.a2c.utils import fc
from baselines.common import tf_util
from baselines.common.distributions import make_pdtype
from baselines.common.input import observation_placeholder, encode_observation
from baselines.common.models import get_network_builder
from baselines.common.tf_util import adjust_shape
from baselines.ppo2.layers import RNN
class PolicyWithValue(object):
"""
Encapsulates fields and methods for RL policy and two value function estimation with shared parameters
"""
def __init__(self, env, observations, latent, dones, states=None, estimate_q=False, vf_latent=None, sess=None):
"""
Parameters:
----------
env RL environment
observations tensorflow placeholder in which the observations will be fed
latent latent state from which policy distribution parameters should be inferred
vf_latent latent state from which value function should be inferred (if None, then latent is used)
sess tensorflow session to run calculations in (if None, default session is used)
**tensors tensorflow tensors for additional attributes such as state or mask
"""
self.X = observations
self.dones = dones
self.pdtype = make_pdtype(env.action_space)
self.states = states
self.sess = sess or tf.get_default_session()
vf_latent = vf_latent if vf_latent is not None else latent
with tf.variable_scope('policy'):
latent = tf.layers.flatten(latent)
# Based on the action space, will select what probability distribution type
self.pd, self.pi = self.pdtype.pdfromlatent(latent, init_scale=0.01)
with tf.variable_scope('sample_action'):
self.action = self.pd.sample()
with tf.variable_scope('negative_log_probability'):
# Calculate the neg log of our probability
self.neglogp = self.pd.neglogp(self.action)
with tf.variable_scope('value'):
vf_latent = tf.layers.flatten(vf_latent)
if estimate_q:
assert isinstance(env.action_space, gym.spaces.Discrete)
self.q = fc(vf_latent, 'q', env.action_space.n)
self.value = self.q
else:
vf_latent = tf.layers.flatten(vf_latent)
self.value = fc(vf_latent, 'value', 1, init_scale=0.01)
self.value = self.value[:, 0]
self.step_input = {
'observations': observations,
'dones': self.dones,
}
self.step_output = {
'actions': self.action,
'values': self.value,
'neglogpacs': self.neglogp,
}
if self.states:
self.initial_state = np.zeros(self.states['current'].get_shape())
self.step_input.update({'states': self.states['current']})
self.step_output.update({'states': self.states['current'],
'next_states': self.states['next']})
else:
self.initial_state = None
def feed_dict(self, **kwargs):
feed_dict = {}
for key in kwargs:
if key in self.step_input:
feed_dict[self.step_input[key]] = adjust_shape(self.step_input[key], kwargs[key])
return feed_dict
def step(self, **kwargs):
return self.sess.run(self.step_output,
feed_dict=self.feed_dict(**kwargs))
def values(self, **kwargs):
return self.sess.run({'values': self.value},
feed_dict=self.feed_dict(**kwargs))
def save(self, save_path):
tf_util.save_state(save_path, sess=self.sess)
def load(self, load_path):
tf_util.load_state(load_path, sess=self.sess)
def build_ppo_policy(env, policy_network, value_network=None, estimate_q=False, **policy_kwargs):
if isinstance(policy_network, str):
network_type = policy_network
policy_network = get_network_builder(network_type)(**policy_kwargs)
if value_network is None:
value_network = 'shared'
def policy_fn(nbatch=None, nsteps=None, sess=None, observ_placeholder=None):
next_states_list = []
state_map = {}
state_placeholder = None
ob_space = env.observation_space
X = observ_placeholder if observ_placeholder is not None else observation_placeholder(ob_space,
batch_size=nbatch)
dones = tf.placeholder(tf.float32, shape=[X.shape[0]], name='dones')
encoded_x = encode_observation(ob_space, X)
with tf.variable_scope('current_rnn_memory'):
if value_network == 'shared':
value_network_ = value_network
else:
if value_network == 'copy':
value_network_ = policy_network
else:
assert callable(value_network)
value_network_ = value_network
policy_memory_size = policy_network.memory_size if isinstance(policy_network, RNN) else 0
value_memory_size = value_network_.memory_size if isinstance(value_network_, RNN) else 0
state_size = policy_memory_size + value_memory_size
if state_size > 0:
state_placeholder = tf.placeholder(dtype=tf.float32, shape=(nbatch, state_size),
name='states')
state_map['policy'] = state_placeholder[:, 0:policy_memory_size]
state_map['value'] = state_placeholder[:, policy_memory_size:]
with tf.variable_scope('policy_latent', reuse=tf.AUTO_REUSE):
if isinstance(policy_network, RNN):
assert policy_memory_size > 0
policy_latent, next_policy_state = \
policy_network(encoded_x, dones, state_map['policy'])
next_states_list.append(next_policy_state)
else:
policy_latent = policy_network(encoded_x)
with tf.variable_scope('value_latent', reuse=tf.AUTO_REUSE):
if value_network_ == 'shared':
value_latent = policy_latent
elif isinstance(value_network_, RNN):
assert value_memory_size > 0
value_latent, next_value_state = \
value_network_(encoded_x, dones, state_map['value'])
next_states_list.append(next_value_state)
else:
value_latent = value_network_(encoded_x)
with tf.name_scope("next_rnn_memory"):
if state_size > 0:
next_states = tf.concat(next_states_list, axis=1)
state_info = {'current': state_placeholder,
'next': next_states, }
else:
state_info = None
policy = PolicyWithValue(
env=env,
observations=X,
dones=dones,
latent=policy_latent,
vf_latent=value_latent,
states=state_info,
sess=sess,
estimate_q=estimate_q,
)
return policy
return policy_fn

View File

@@ -1,28 +1,35 @@
import os
import time
import numpy as np
import os.path as osp
from baselines import logger
import time
from collections import deque
from baselines.common import explained_variance, set_global_seeds
from baselines.common.policies import build_policy
import numpy as np
import tensorflow as tf
from baselines import logger
from baselines.common import explained_variance
from baselines.common import set_global_seeds
from baselines.common.tf_util import display_var_info
from baselines.ppo2.policies import build_ppo_policy
from baselines.ppo2.runner import Runner
try:
from mpi4py import MPI
except ImportError:
MPI = None
from baselines.ppo2.runner import Runner
def constfn(val):
def f(_):
return val
return f
def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=0, load_path=None, model_fn=None, update_fn=None, init_fn=None, mpi_rank_weight=1, comm=None, **network_kwargs):
'''
def learn(*, network, env, total_timesteps, eval_env=None, seed=None, nsteps=128, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=10, load_path=None, model_fn=None, **network_kwargs):
"""
Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347)
Parameters:
@@ -52,7 +59,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
max_grad_norm: float or None gradient norm clipping coefficient
gamma: float discounting factor
gamma: float discounting factor for rewards
lam: float advantage estimation discounting factor (lambda in the paper)
@@ -72,20 +79,21 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
**network_kwargs: keyword arguments to the policy / network builder. See baselines.common/policies.py/build_policy and arguments to a particular type of network
For instance, 'mlp' network architecture has arguments num_hidden and num_layers.
'''
"""
set_global_seeds(seed)
if isinstance(lr, float): lr = constfn(lr)
else: assert callable(lr)
if isinstance(cliprange, float): cliprange = constfn(cliprange)
else: assert callable(cliprange)
if isinstance(lr, float):
lr = constfn(lr)
else:
assert callable(lr)
if isinstance(cliprange, float):
cliprange = constfn(cliprange)
else:
assert callable(cliprange)
total_timesteps = int(total_timesteps)
policy = build_policy(env, network, **network_kwargs)
policy = build_ppo_policy(env, network, **network_kwargs)
# Get the nb of env
nenvs = env.num_envs
@@ -97,7 +105,6 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
# Calculate the batch_size
nbatch = nenvs * nsteps
nbatch_train = nbatch // nminibatches
is_mpi_root = (MPI is None or MPI.COMM_WORLD.Get_rank() == 0)
# Instantiate the model object (that creates act_model and train_model)
if model_fn is None:
@@ -105,28 +112,29 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
model_fn = Model
model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train,
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef,
max_grad_norm=max_grad_norm, comm=comm, mpi_rank_weight=mpi_rank_weight)
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, max_grad_norm=max_grad_norm)
if load_path is not None:
model.load(load_path)
allvars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=model.name)
display_var_info(allvars)
# Instantiate the runner object
runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, lam=lam)
runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, ob_space=ob_space, lam=lam)
if eval_env is not None:
eval_runner = Runner(env = eval_env, model = model, nsteps = nsteps, gamma = gamma, lam= lam)
eval_runner = Runner(env=eval_env, model=model, nsteps=nsteps, gamma=gamma, ob_space=ob_space, lam=lam)
epinfobuf = deque(maxlen=100)
if eval_env is not None:
eval_epinfobuf = deque(maxlen=100)
if init_fn is not None:
init_fn()
# Start total timer
tfirststart = time.perf_counter()
nupdates = total_timesteps // nbatch
nupdates = total_timesteps//nbatch
for update in range(1, nupdates+1):
for update in range(1, nupdates + 1):
assert nbatch % nminibatches == 0
# Start timer
tstart = time.perf_counter()
@@ -136,48 +144,39 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
# Calculate the cliprange
cliprangenow = cliprange(frac)
if update % log_interval == 0 and is_mpi_root: logger.info('Stepping environment...')
# Get minibatch
obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632
minibatch = runner.run()
if eval_env is not None:
eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632
eval_minibatch = eval_runner.run()
_eval_obs = eval_minibatch['observations'] # noqa: F841
_eval_returns = eval_minibatch['returns'] # noqa: F841
_eval_masks = eval_minibatch['masks'] # noqa: F841
_eval_actions = eval_minibatch['actions'] # noqa: F841
_eval_values = eval_minibatch['values'] # noqa: F841
_eval_neglogpacs = eval_minibatch['neglogpacs'] # noqa: F841
_eval_states = eval_minibatch['state'] # noqa: F841
eval_epinfos = eval_minibatch['epinfos']
if update % log_interval == 0 and is_mpi_root: logger.info('Done.')
epinfobuf.extend(epinfos)
epinfobuf.extend(minibatch.pop('epinfos'))
if eval_env is not None:
eval_epinfobuf.extend(eval_epinfos)
# Here what we're going to do is for each minibatch calculate the loss and append it.
mblossvals = []
if states is None: # nonrecurrent version
# Index of each element of batch_size
# Create the indices array
inds = np.arange(nbatch)
for _ in range(noptepochs):
# Randomize the indexes
np.random.shuffle(inds)
# 0 to batch_size with batch_train_size step
for start in range(0, nbatch, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
slices = (arr[mbinds] for arr in (obs, returns, masks, actions, values, neglogpacs))
mblossvals.append(model.train(lrnow, cliprangenow, *slices))
else: # recurrent version
assert nenvs % nminibatches == 0
envsperbatch = nenvs // nminibatches
envinds = np.arange(nenvs)
flatinds = np.arange(nenvs * nsteps).reshape(nenvs, nsteps)
for _ in range(noptepochs):
np.random.shuffle(envinds)
for start in range(0, nenvs, envsperbatch):
end = start + envsperbatch
mbenvinds = envinds[start:end]
mbflatinds = flatinds[mbenvinds].ravel()
slices = (arr[mbflatinds] for arr in (obs, returns, masks, actions, values, neglogpacs))
mbstates = states[mbenvinds]
mblossvals.append(model.train(lrnow, cliprangenow, *slices, mbstates))
# Index of each element of batch_size
# Create the indices array
inds = np.arange(nbatch)
for _ in range(noptepochs):
# Randomize the indexes
np.random.shuffle(inds)
# 0 to batch_size with batch_train_size step
for start in range(0, nbatch, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
slices = {key: minibatch[key][mbinds] for key in minibatch}
mblossvals.append(model.train(lrnow, cliprangenow, **slices))
# Feedforward --> get losses --> update
lossvals = np.mean(mblossvals, axis=0)
@@ -185,40 +184,39 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
tnow = time.perf_counter()
# Calculate the fps (frame per second)
fps = int(nbatch / (tnow - tstart))
if update_fn is not None:
update_fn(update)
if update % log_interval == 0 or update == 1:
# Calculates if value function is a good predicator of the returns (ev > 1)
# or if it's just worse than predicting nothing (ev =< 0)
ev = explained_variance(values, returns)
logger.logkv("misc/serial_timesteps", update*nsteps)
logger.logkv("misc/nupdates", update)
logger.logkv("misc/total_timesteps", update*nbatch)
ev = explained_variance(minibatch['values'], minibatch['returns'])
logger.logkv("serial_timesteps", update * nsteps)
logger.logkv("nupdates", update)
logger.logkv("total_timesteps", update * nbatch)
logger.logkv("fps", fps)
logger.logkv("misc/explained_variance", float(ev))
logger.logkv("explained_variance", float(ev))
logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf]))
if eval_env is not None:
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) )
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) )
logger.logkv('misc/time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv('loss/' + lossname, lossval)
logger.logkv('rewards_per_step', safemean(minibatch['rewards']))
logger.logkv('advantages_per_step', safemean(minibatch['advs']))
logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and is_mpi_root:
if eval_env is not None:
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]))
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]))
logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (
MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update)
savepath = osp.join(checkdir, '%.5i' % update)
print('Saving to', savepath)
model.save(savepath)
del minibatch
return model
# Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error)
def safemean(xs):
return np.nan if len(xs) == 0 else np.mean(xs)

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

View File

@@ -1,6 +1,8 @@
import numpy as np
from baselines.common.runners import AbstractEnvRunner
class Runner(AbstractEnvRunner):
"""
We use this object to make a mini batch of experiences
@@ -10,67 +12,118 @@ class Runner(AbstractEnvRunner):
run():
- Make a mini batch
"""
def __init__(self, *, env, model, nsteps, gamma, lam):
def __init__(self, *, env, model, nsteps, gamma, ob_space, lam):
super().__init__(env=env, model=model, nsteps=nsteps)
# Lambda used in GAE (General Advantage Estimation)
self.lam = lam
# Discount rate
self.gamma = gamma
self.lam = lam # Lambda used in GAE (General Advantage Estimation)
self.gamma = gamma # Discount rate for rewards
self.ob_space = ob_space
def run(self):
# Here, we init the lists that will contain the mb of experiences
mb_obs, mb_rewards, mb_actions, mb_values, mb_dones, mb_neglogpacs = [],[],[],[],[],[]
mb_states = self.states
minibatch = {
"observations": [],
"actions": [],
"rewards": [],
"values": [],
"dones": [],
"neglogpacs": [],
}
data_type = {
"observations": self.obs.dtype,
"actions": np.float32,
"rewards": np.float32,
"values": np.float32,
"dones": np.float32,
"neglogpacs": np.float32,
}
prev_transition = {'next_states': self.model.initial_state} if self.model.initial_state is not None else {}
epinfos = []
# For n in range number of steps
for _ in range(self.nsteps):
# Given observations, get action value and neglopacs
# We already have self.obs because Runner superclass run self.obs[:] = env.reset() on init
actions, values, self.states, neglogpacs = self.model.step(self.obs, S=self.states, M=self.dones)
mb_obs.append(self.obs.copy())
mb_actions.append(actions)
mb_values.append(values)
mb_neglogpacs.append(neglogpacs)
mb_dones.append(self.dones)
transitions = {}
transitions['observations'] = self.obs.copy()
transitions['dones'] = self.dones
if 'next_states' in prev_transition:
transitions['states'] = prev_transition['next_states']
transitions.update(self.model.step_with_dict(**transitions))
# Take actions in env and look the results
# Infos contains a ton of useful informations
self.obs[:], rewards, self.dones, infos = self.env.step(actions)
self.obs, transitions['rewards'], self.dones, infos = self.env.step(transitions['actions'])
self.dones = np.array(self.dones, dtype=np.float)
for info in infos:
maybeepinfo = info.get('episode')
if maybeepinfo: epinfos.append(maybeepinfo)
mb_rewards.append(rewards)
#batch of steps to batch of rollouts
mb_obs = np.asarray(mb_obs, dtype=self.obs.dtype)
mb_rewards = np.asarray(mb_rewards, dtype=np.float32)
mb_actions = np.asarray(mb_actions)
mb_values = np.asarray(mb_values, dtype=np.float32)
mb_neglogpacs = np.asarray(mb_neglogpacs, dtype=np.float32)
mb_dones = np.asarray(mb_dones, dtype=np.bool)
last_values = self.model.value(self.obs, S=self.states, M=self.dones)
if maybeepinfo:
epinfos.append(maybeepinfo)
# discount/bootstrap off value fn
mb_returns = np.zeros_like(mb_rewards)
mb_advs = np.zeros_like(mb_rewards)
lastgaelam = 0
for key in transitions:
if key not in minibatch:
minibatch[key] = []
minibatch[key].append(transitions[key])
prev_transition = transitions
for key in minibatch:
dtype = data_type[key] if key in data_type else np.float
minibatch[key] = np.array(minibatch[key], dtype=dtype)
transitions['observations'] = self.obs.copy()
transitions['dones'] = self.dones
if 'states' in transitions:
transitions['states'] = transitions.pop('next_states')
for key in minibatch:
dtype = data_type[key] if key in data_type else np.float
minibatch[key] = np.asarray(minibatch[key], dtype=dtype)
last_values = self.model.step_with_dict(**transitions)['values']
# Calculate returns and advantages.
minibatch['advs'], minibatch['returns'] = \
self.advantage_and_returns(values=minibatch['values'],
rewards=minibatch['rewards'],
dones=minibatch['dones'],
last_values=last_values,
last_dones=self.dones,
gamma=self.gamma)
for key in minibatch:
minibatch[key] = sf01(minibatch[key])
minibatch['epinfos'] = epinfos
return minibatch
def advantage_and_returns(self, values, rewards, dones, last_values, last_dones, gamma,
use_non_episodic_rewards=False):
"""
calculate Generalized Advantage Estimation (GAE), https://arxiv.org/abs/1506.02438
see also Proximal Policy Optimization Algorithms, https://arxiv.org/abs/1707.06347
"""
advantages = np.zeros_like(rewards)
lastgaelam = 0 # Lambda used in General Advantage Estimation
for t in reversed(range(self.nsteps)):
if t == self.nsteps - 1:
nextnonterminal = 1.0 - self.dones
nextvalues = last_values
if not use_non_episodic_rewards:
if t == self.nsteps - 1:
next_non_terminal = 1.0 - last_dones
else:
next_non_terminal = 1.0 - dones[t + 1]
else:
nextnonterminal = 1.0 - mb_dones[t+1]
nextvalues = mb_values[t+1]
delta = mb_rewards[t] + self.gamma * nextvalues * nextnonterminal - mb_values[t]
mb_advs[t] = lastgaelam = delta + self.gamma * self.lam * nextnonterminal * lastgaelam
mb_returns = mb_advs + mb_values
return (*map(sf01, (mb_obs, mb_returns, mb_dones, mb_actions, mb_values, mb_neglogpacs)),
mb_states, epinfos)
# obs, returns, masks, actions, values, neglogpacs, states = runner.run()
next_non_terminal = 1.0
next_value = values[t + 1] if t < self.nsteps - 1 else last_values
delta = rewards[t] + gamma * next_value * next_non_terminal - values[t]
advantages[t] = lastgaelam = delta + gamma * self.lam * next_non_terminal * lastgaelam
returns = advantages + values
return advantages, returns
def sf01(arr):
"""
swap and then flatten axes 0 and 1
"""
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])

View File

@@ -32,7 +32,7 @@ except ImportError:
_game_envs = defaultdict(set)
for env in gym.envs.registry.all():
# TODO: solve this with regexes
env_type = env.entry_point.split(':')[0].split('.')[-1]
env_type = env._entry_point.split(':')[0].split('.')[-1]
_game_envs[env_type].add(env.id)
# reading benchmark names directly from retro requires
@@ -113,7 +113,7 @@ def build_env(args):
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True)
env = VecNormalize(env)
return env
@@ -126,7 +126,7 @@ def get_env_type(args):
# Re-parse the gym registry, since we could have new envs since last time.
for env in gym.envs.registry.all():
env_type = env.entry_point.split(':')[0].split('.')[-1]
env_type = env._entry_point.split(':')[0].split('.')[-1]
_game_envs[env_type].add(env.id) # This is a set so add is idempotent
if env_id in _game_envs.keys():
@@ -192,12 +192,6 @@ def parse_cmdline_kwargs(args):
return {k: parse(v) for k,v in parse_unknown_args(args).items()}
def configure_logger(log_path, **kwargs):
if log_path is not None:
logger.configure(log_path)
else:
logger.configure(**kwargs)
def main(args):
# configure logger, disable logging in child MPI processes (with rank > 0)
@@ -208,10 +202,10 @@ def main(args):
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
rank = 0
configure_logger(args.log_path)
logger.configure()
else:
logger.configure(format_strs=[])
rank = MPI.COMM_WORLD.Get_rank()
configure_logger(args.log_path, format_strs=[])
model, env = train(args, extra_args)
@@ -226,7 +220,7 @@ def main(args):
state = model.initial_state if hasattr(model, 'initial_state') else None
dones = np.zeros((1,))
episode_rew = np.zeros(env.num_envs) if isinstance(env, VecEnv) else np.zeros(1)
episode_rew = 0
while True:
if state is not None:
actions, _, state, _ = model.step(obs,S=state, M=dones)
@@ -234,13 +228,13 @@ def main(args):
actions, _, _, _ = model.step(obs)
obs, rew, done, _ = env.step(actions)
episode_rew += rew
episode_rew += rew[0] if isinstance(env, VecEnv) else rew
env.render()
done_any = done.any() if isinstance(done, np.ndarray) else done
if done_any:
for i in np.nonzero(done)[0]:
print('episode_rew={}'.format(episode_rew[i]))
episode_rew[i] = 0
done = done.any() if isinstance(done, np.ndarray) else done
if done:
print('episode_rew={}'.format(episode_rew))
episode_rew = 0
obs = env.reset()
env.close()

View File

@@ -4,3 +4,4 @@ exclude =
.git,
__pycache__,
baselines/ppo1,
baselines/bench,

View File

@@ -12,9 +12,10 @@ extras = {
'filelock',
'pytest',
'pytest-forked',
'atari-py',
'matplotlib',
'pandas'
'atari-py'
],
'bullet': [
'pybullet',
],
'mpi': [
'mpi4py'
@@ -31,10 +32,12 @@ setup(name='baselines',
packages=[package for package in find_packages()
if package.startswith('baselines')],
install_requires=[
'gym>=0.15.4, <0.16.0',
'gym',
'scipy',
'tqdm',
'joblib',
'dill',
'progressbar2',
'cloudpickle',
'click',
'opencv-python'
@@ -44,7 +47,7 @@ setup(name='baselines',
author='OpenAI',
url='https://github.com/openai/baselines',
author_email='gym@openai.com',
version='0.1.6')
version='0.1.5')
# ensure there is some tensorflow build with version above 1.4