Merge branch 'internal' of github.com:openai/baselines into internal

This commit is contained in:
Peter Zhokhov
2019-05-03 15:56:04 -07:00
24 changed files with 293 additions and 100 deletions

View File

@@ -11,4 +11,4 @@ install:
script: script:
- flake8 . --show-source --statistics - flake8 . --show-source --statistics
- docker run baselines-test pytest -v . - docker run -e RUNSLOW=1 baselines-test pytest -v .

View File

@@ -25,7 +25,9 @@ def make_vec_env(env_id, env_type, num_env, seed,
start_index=0, start_index=0,
reward_scale=1.0, reward_scale=1.0,
flatten_dict_observations=True, flatten_dict_observations=True,
gamestate=None): gamestate=None,
initializer=None,
force_dummy=False):
""" """
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo. Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
""" """
@@ -34,7 +36,7 @@ def make_vec_env(env_id, env_type, num_env, seed,
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0 mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir() logger_dir = logger.get_dir()
def make_thunk(rank): def make_thunk(rank, initializer=None):
return lambda: make_env( return lambda: make_env(
env_id=env_id, env_id=env_id,
env_type=env_type, env_type=env_type,
@@ -46,17 +48,21 @@ def make_vec_env(env_id, env_type, num_env, seed,
flatten_dict_observations=flatten_dict_observations, flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs, wrapper_kwargs=wrapper_kwargs,
env_kwargs=env_kwargs, env_kwargs=env_kwargs,
logger_dir=logger_dir logger_dir=logger_dir,
initializer=initializer
) )
set_global_seeds(seed) set_global_seeds(seed)
if num_env > 1: if not force_dummy and num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index) for i in range(num_env)]) return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)])
else: else:
return DummyVecEnv([make_thunk(start_index)]) return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None): def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
wrapper_kwargs = wrapper_kwargs or {} wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {} env_kwargs = env_kwargs or {}
if ':' in env_id: if ':' in env_id:

View File

@@ -3,7 +3,6 @@ import tensorflow as tf
from baselines.a2c import utils from baselines.a2c import utils
from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch
from baselines.common.mpi_running_mean_std import RunningMeanStd from baselines.common.mpi_running_mean_std import RunningMeanStd
import tensorflow.contrib.layers as layers
mapping = {} mapping = {}
@@ -26,6 +25,51 @@ def nature_cnn(unscaled_images, **conv_kwargs):
h3 = conv_to_fc(h3) h3 = conv_to_fc(h3)
return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2)))
def build_impala_cnn(unscaled_images, depths=[16,32,32], **conv_kwargs):
"""
Model used in the paper "IMPALA: Scalable Distributed Deep-RL with
Importance Weighted Actor-Learner Architectures" https://arxiv.org/abs/1802.01561
"""
layer_num = 0
def get_layer_num_str():
nonlocal layer_num
num_str = str(layer_num)
layer_num += 1
return num_str
def conv_layer(out, depth):
return tf.layers.conv2d(out, depth, 3, padding='same', name='layer_' + get_layer_num_str())
def residual_block(inputs):
depth = inputs.get_shape()[-1].value
out = tf.nn.relu(inputs)
out = conv_layer(out, depth)
out = tf.nn.relu(out)
out = conv_layer(out, depth)
return out + inputs
def conv_sequence(inputs, depth):
out = conv_layer(inputs, depth)
out = tf.layers.max_pooling2d(out, pool_size=3, strides=2, padding='same')
out = residual_block(out)
out = residual_block(out)
return out
out = tf.cast(unscaled_images, tf.float32) / 255.
for depth in depths:
out = conv_sequence(out, depth)
out = tf.layers.flatten(out)
out = tf.nn.relu(out)
out = tf.layers.dense(out, 256, activation=tf.nn.relu, name='layer_' + get_layer_num_str())
return out
@register("mlp") @register("mlp")
def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False): def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False):
@@ -65,6 +109,11 @@ def cnn(**conv_kwargs):
return nature_cnn(X, **conv_kwargs) return nature_cnn(X, **conv_kwargs)
return network_fn return network_fn
@register("impala_cnn")
def impala_cnn(**conv_kwargs):
def network_fn(X):
return build_impala_cnn(X)
return network_fn
@register("cnn_small") @register("cnn_small")
def cnn_small(**conv_kwargs): def cnn_small(**conv_kwargs):
@@ -79,7 +128,6 @@ def cnn_small(**conv_kwargs):
return h return h
return network_fn return network_fn
@register("lstm") @register("lstm")
def lstm(nlstm=128, layer_norm=False): def lstm(nlstm=128, layer_norm=False):
""" """
@@ -136,12 +184,12 @@ def lstm(nlstm=128, layer_norm=False):
@register("cnn_lstm") @register("cnn_lstm")
def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs):
def network_fn(X, nenv=1): def network_fn(X, nenv=1):
nbatch = X.shape[0] nbatch = X.shape[0]
nsteps = nbatch // nenv nsteps = nbatch // nenv
h = nature_cnn(X, **conv_kwargs) h = conv_fn(X, **conv_kwargs)
M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1)
S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states
@@ -161,6 +209,9 @@ def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs):
return network_fn return network_fn
@register("impala_cnn_lstm")
def impala_cnn_lstm():
return cnn_lstm(nlstm=256, conv_fn=build_impala_cnn)
@register("cnn_lnlstm") @register("cnn_lnlstm")
def cnn_lnlstm(nlstm=128, **conv_kwargs): def cnn_lnlstm(nlstm=128, **conv_kwargs):
@@ -187,7 +238,7 @@ def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs):
out = tf.cast(X, tf.float32) / 255. out = tf.cast(X, tf.float32) / 255.
with tf.variable_scope("convnet"): with tf.variable_scope("convnet"):
for num_outputs, kernel_size, stride in convs: for num_outputs, kernel_size, stride in convs:
out = layers.convolution2d(out, out = tf.contrib.layers.convolution2d(out,
num_outputs=num_outputs, num_outputs=num_outputs,
kernel_size=kernel_size, kernel_size=kernel_size,
stride=stride, stride=stride,

View File

@@ -2,6 +2,7 @@ import numpy as np
import tensorflow as tf import tensorflow as tf
from baselines.common import tf_util as U from baselines.common import tf_util as U
from baselines.common.tests.test_with_mpi import with_mpi from baselines.common.tests.test_with_mpi import with_mpi
from baselines import logger
try: try:
from mpi4py import MPI from mpi4py import MPI
except ImportError: except ImportError:
@@ -9,22 +10,34 @@ except ImportError:
class MpiAdamOptimizer(tf.train.AdamOptimizer): class MpiAdamOptimizer(tf.train.AdamOptimizer):
"""Adam optimizer that averages gradients across mpi processes.""" """Adam optimizer that averages gradients across mpi processes."""
def __init__(self, comm, **kwargs): def __init__(self, comm, grad_clip=None, mpi_rank_weight=1, **kwargs):
self.comm = comm self.comm = comm
self.grad_clip = grad_clip
self.mpi_rank_weight = mpi_rank_weight
tf.train.AdamOptimizer.__init__(self, **kwargs) tf.train.AdamOptimizer.__init__(self, **kwargs)
def compute_gradients(self, loss, var_list, **kwargs): def compute_gradients(self, loss, var_list, **kwargs):
grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs) grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs)
grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None] grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None]
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight
shapes = [v.shape.as_list() for g, v in grads_and_vars] shapes = [v.shape.as_list() for g, v in grads_and_vars]
sizes = [int(np.prod(s)) for s in shapes] sizes = [int(np.prod(s)) for s in shapes]
num_tasks = self.comm.Get_size()
total_weight = np.zeros(1, np.float32)
self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM)
total_weight = total_weight[0]
buf = np.zeros(sum(sizes), np.float32) buf = np.zeros(sum(sizes), np.float32)
countholder = [0] # Counts how many times _collect_grads has been called countholder = [0] # Counts how many times _collect_grads has been called
stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable
def _collect_grads(flat_grad, np_stat): def _collect_grads(flat_grad, np_stat):
if self.grad_clip is not None:
gradnorm = np.linalg.norm(flat_grad)
if gradnorm > 1:
flat_grad /= gradnorm
logger.logkv_mean('gradnorm', gradnorm)
logger.logkv_mean('gradclipfrac', float(gradnorm > 1))
self.comm.Allreduce(flat_grad, buf, op=MPI.SUM) self.comm.Allreduce(flat_grad, buf, op=MPI.SUM)
np.divide(buf, float(num_tasks), out=buf) np.divide(buf, float(total_weight), out=buf)
if countholder[0] % 100 == 0: if countholder[0] % 100 == 0:
check_synced(np_stat, self.comm) check_synced(np_stat, self.comm)
countholder[0] += 1 countholder[0] += 1
@@ -51,8 +64,8 @@ def check_synced(localval, comm=None):
comm = comm or MPI.COMM_WORLD comm = comm or MPI.COMM_WORLD
vals = comm.gather(localval) vals = comm.gather(localval)
if comm.rank == 0: if comm.rank == 0:
assert all(val==vals[0] for val in vals[1:]) assert all(val==vals[0] for val in vals[1:]),\
f'MpiAdamOptimizer detected that different workers have different weights: {vals}'
@with_mpi(timeout=5) @with_mpi(timeout=5)
def test_nonfreeze(): def test_nonfreeze():

View File

@@ -248,7 +248,10 @@ def plot_results(
figsize=None, figsize=None,
legend_outside=False, legend_outside=False,
resample=0, resample=0,
smooth_step=1.0 smooth_step=1.0,
tiling='vertical',
xlabel=None,
ylabel=None
): ):
''' '''
Plot multiple Results objects Plot multiple Results objects
@@ -300,9 +303,23 @@ def plot_results(
sk2r[splitkey].append(result) sk2r[splitkey].append(result)
assert len(sk2r) > 0 assert len(sk2r) > 0
assert isinstance(resample, int), "0: don't resample. <integer>: that many samples" assert isinstance(resample, int), "0: don't resample. <integer>: that many samples"
nrows = len(sk2r) if tiling == 'vertical' or tiling is None:
ncols = 1 nrows = len(sk2r)
figsize = figsize or (6, 6 * nrows) ncols = 1
elif tiling == 'horizontal':
ncols = len(sk2r)
nrows = 1
elif tiling == 'symmetric':
import math
N = len(sk2r)
largest_divisor = 1
for i in range(1, int(math.sqrt(N))+1):
if N % i == 0:
largest_divisor = i
ncols = largest_divisor
nrows = N // ncols
figsize = figsize or (6 * ncols, 6 * nrows)
f, axarr = plt.subplots(nrows, ncols, sharex=False, squeeze=False, figsize=figsize) f, axarr = plt.subplots(nrows, ncols, sharex=False, squeeze=False, figsize=figsize)
groups = list(set(group_fn(result) for result in allresults)) groups = list(set(group_fn(result) for result in allresults))
@@ -316,7 +333,9 @@ def plot_results(
g2c = defaultdict(int) g2c = defaultdict(int)
sresults = sk2r[sk] sresults = sk2r[sk]
gresults = defaultdict(list) gresults = defaultdict(list)
ax = axarr[isplit][0] idx_row = isplit // ncols
idx_col = isplit % ncols
ax = axarr[idx_row][idx_col]
for result in sresults: for result in sresults:
group = group_fn(result) group = group_fn(result)
g2c[group] += 1 g2c[group] += 1
@@ -355,7 +374,7 @@ def plot_results(
ymean = np.mean(ys, axis=0) ymean = np.mean(ys, axis=0)
ystd = np.std(ys, axis=0) ystd = np.std(ys, axis=0)
ystderr = ystd / np.sqrt(len(ys)) ystderr = ystd / np.sqrt(len(ys))
l, = axarr[isplit][0].plot(usex, ymean, color=color) l, = axarr[idx_row][idx_col].plot(usex, ymean, color=color)
g2l[group] = l g2l[group] = l
if shaded_err: if shaded_err:
ax.fill_between(usex, ymean - ystderr, ymean + ystderr, color=color, alpha=.4) ax.fill_between(usex, ymean - ystderr, ymean + ystderr, color=color, alpha=.4)
@@ -372,6 +391,17 @@ def plot_results(
loc=2 if legend_outside else None, loc=2 if legend_outside else None,
bbox_to_anchor=(1,1) if legend_outside else None) bbox_to_anchor=(1,1) if legend_outside else None)
ax.set_title(sk) ax.set_title(sk)
# add xlabels, but only to the bottom row
if xlabel is not None:
for ax in axarr[-1]:
plt.sca(ax)
plt.xlabel(xlabel)
# add ylabels, but only to left column
if ylabel is not None:
for ax in axarr[:,0]:
plt.sca(ax)
plt.ylabel(ylabel)
return f, axarr return f, axarr
def regression_analysis(df): def regression_analysis(df):

View File

@@ -0,0 +1,2 @@
import os, pytest
mark_slow = pytest.mark.skipif(not os.getenv('RUNSLOW'), reason='slow')

View File

@@ -9,18 +9,16 @@ class FixedSequenceEnv(Env):
n_actions=10, n_actions=10,
episode_len=100 episode_len=100
): ):
self.np_random = np.random.RandomState()
self.sequence = None
self.action_space = Discrete(n_actions) self.action_space = Discrete(n_actions)
self.observation_space = Discrete(1) self.observation_space = Discrete(1)
self.np_random = np.random.RandomState(0)
self.episode_len = episode_len self.episode_len = episode_len
self.sequence = [self.np_random.randint(0, self.action_space.n)
for _ in range(self.episode_len)]
self.time = 0 self.time = 0
def reset(self): def reset(self):
if self.sequence is None:
self.sequence = [self.np_random.randint(0, self.action_space.n-1) for _ in range(self.episode_len)]
self.time = 0 self.time = 0
return 0 return 0
@@ -29,7 +27,6 @@ class FixedSequenceEnv(Env):
self._choose_next_state() self._choose_next_state()
done = False done = False
if self.episode_len and self.time >= self.episode_len: if self.episode_len and self.time >= self.episode_len:
rew = 0
done = True done = True
return 0, rew, done, {} return 0, rew, done, {}

View File

@@ -2,43 +2,45 @@ import numpy as np
from abc import abstractmethod from abc import abstractmethod
from gym import Env from gym import Env
from gym.spaces import MultiDiscrete, Discrete, Box from gym.spaces import MultiDiscrete, Discrete, Box
from collections import deque
class IdentityEnv(Env): class IdentityEnv(Env):
def __init__( def __init__(
self, self,
episode_len=None episode_len=None,
delay=0,
zero_first_rewards=True
): ):
self.observation_space = self.action_space self.observation_space = self.action_space
self.episode_len = episode_len self.episode_len = episode_len
self.time = 0 self.time = 0
self.reset() self.delay = delay
self.zero_first_rewards = zero_first_rewards
self.q = deque(maxlen=delay+1)
def reset(self): def reset(self):
self._choose_next_state() self.q.clear()
for _ in range(self.delay + 1):
self.q.append(self.action_space.sample())
self.time = 0 self.time = 0
return self.state return self.q[-1]
def step(self, actions): def step(self, actions):
rew = self._get_reward(actions) rew = self._get_reward(self.q.popleft(), actions)
self._choose_next_state() if self.zero_first_rewards and self.time < self.delay:
done = False rew = 0
if self.episode_len and self.time >= self.episode_len: self.q.append(self.action_space.sample())
done = True self.time += 1
done = self.episode_len is not None and self.time >= self.episode_len
return self.state, rew, done, {} return self.q[-1], rew, done, {}
def seed(self, seed=None): def seed(self, seed=None):
self.action_space.seed(seed) self.action_space.seed(seed)
def _choose_next_state(self):
self.state = self.action_space.sample()
self.time += 1
@abstractmethod @abstractmethod
def _get_reward(self, actions): def _get_reward(self, state, actions):
raise NotImplementedError raise NotImplementedError
@@ -47,26 +49,29 @@ class DiscreteIdentityEnv(IdentityEnv):
self, self,
dim, dim,
episode_len=None, episode_len=None,
delay=0,
zero_first_rewards=True
): ):
self.action_space = Discrete(dim) self.action_space = Discrete(dim)
super().__init__(episode_len=episode_len) super().__init__(episode_len=episode_len, delay=delay, zero_first_rewards=zero_first_rewards)
def _get_reward(self, actions): def _get_reward(self, state, actions):
return 1 if self.state == actions else 0 return 1 if state == actions else 0
class MultiDiscreteIdentityEnv(IdentityEnv): class MultiDiscreteIdentityEnv(IdentityEnv):
def __init__( def __init__(
self, self,
dims, dims,
episode_len=None, episode_len=None,
delay=0,
): ):
self.action_space = MultiDiscrete(dims) self.action_space = MultiDiscrete(dims)
super().__init__(episode_len=episode_len) super().__init__(episode_len=episode_len, delay=delay)
def _get_reward(self, actions): def _get_reward(self, state, actions):
return 1 if all(self.state == actions) else 0 return 1 if all(state == actions) else 0
class BoxIdentityEnv(IdentityEnv): class BoxIdentityEnv(IdentityEnv):
@@ -79,7 +84,7 @@ class BoxIdentityEnv(IdentityEnv):
self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32) self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32)
super().__init__(episode_len=episode_len) super().__init__(episode_len=episode_len)
def _get_reward(self, actions): def _get_reward(self, state, actions):
diff = actions - self.state diff = actions - state
diff = diff[:] diff = diff[:]
return -0.5 * np.dot(diff, diff) return -0.5 * np.dot(diff, diff)

View File

@@ -0,0 +1,36 @@
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv
def test_discrete_nodelay():
nsteps = 100
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen)
ob = env.reset()
for t in range(nsteps):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
assert rew == (1 if action == ob else 0)
if (t + 1) % eplen == 0:
assert done
next_ob = env.reset()
else:
assert not done
ob = next_ob
def test_discrete_delay1():
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen, delay=1)
ob = env.reset()
prev_ob = None
for t in range(eplen):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
if t > 0:
assert rew == (1 if action == prev_ob else 0)
else:
assert rew == 0
prev_ob = ob
ob = next_ob
if t < eplen - 1:
assert not done
assert done

View File

@@ -3,6 +3,7 @@ import gym
from baselines.run import get_learn_function from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
common_kwargs = dict( common_kwargs = dict(
total_timesteps=30000, total_timesteps=30000,
@@ -20,7 +21,7 @@ learn_kwargs = {
'trpo_mpi': {} 'trpo_mpi': {}
} }
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", learn_kwargs.keys()) @pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_cartpole(alg): def test_cartpole(alg):
''' '''

View File

@@ -3,6 +3,7 @@ import gym
from baselines.run import get_learn_function from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
pytest.importorskip('mujoco_py') pytest.importorskip('mujoco_py')
@@ -15,7 +16,7 @@ learn_kwargs = {
'her': dict(total_timesteps=2000) 'her': dict(total_timesteps=2000)
} }
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", learn_kwargs.keys()) @pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_fetchreach(alg): def test_fetchreach(alg):
''' '''

View File

@@ -3,6 +3,8 @@ from baselines.common.tests.envs.fixed_sequence_env import FixedSequenceEnv
from baselines.common.tests.util import simple_test from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
common_kwargs = dict( common_kwargs = dict(
seed=0, seed=0,
@@ -21,7 +23,7 @@ learn_kwargs = {
alg_list = learn_kwargs.keys() alg_list = learn_kwargs.keys()
rnn_list = ['lstm'] rnn_list = ['lstm']
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", alg_list) @pytest.mark.parametrize("alg", alg_list)
@pytest.mark.parametrize("rnn", rnn_list) @pytest.mark.parametrize("rnn", rnn_list)
def test_fixed_sequence(alg, rnn): def test_fixed_sequence(alg, rnn):

View File

@@ -2,6 +2,7 @@ import pytest
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv, MultiDiscreteIdentityEnv from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv, MultiDiscreteIdentityEnv
from baselines.run import get_learn_function from baselines.run import get_learn_function
from baselines.common.tests.util import simple_test from baselines.common.tests.util import simple_test
from baselines.common.tests import mark_slow
common_kwargs = dict( common_kwargs = dict(
total_timesteps=30000, total_timesteps=30000,
@@ -24,7 +25,7 @@ algos_disc = ['a2c', 'acktr', 'deepq', 'ppo2', 'trpo_mpi']
algos_multidisc = ['a2c', 'acktr', 'ppo2', 'trpo_mpi'] algos_multidisc = ['a2c', 'acktr', 'ppo2', 'trpo_mpi']
algos_cont = ['a2c', 'acktr', 'ddpg', 'ppo2', 'trpo_mpi'] algos_cont = ['a2c', 'acktr', 'ddpg', 'ppo2', 'trpo_mpi']
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", algos_disc) @pytest.mark.parametrize("alg", algos_disc)
def test_discrete_identity(alg): def test_discrete_identity(alg):
''' '''
@@ -39,7 +40,7 @@ def test_discrete_identity(alg):
env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100) env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100)
simple_test(env_fn, learn_fn, 0.9) simple_test(env_fn, learn_fn, 0.9)
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", algos_multidisc) @pytest.mark.parametrize("alg", algos_multidisc)
def test_multidiscrete_identity(alg): def test_multidiscrete_identity(alg):
''' '''
@@ -54,7 +55,7 @@ def test_multidiscrete_identity(alg):
env_fn = lambda: MultiDiscreteIdentityEnv((3,3), episode_len=100) env_fn = lambda: MultiDiscreteIdentityEnv((3,3), episode_len=100)
simple_test(env_fn, learn_fn, 0.9) simple_test(env_fn, learn_fn, 0.9)
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", algos_cont) @pytest.mark.parametrize("alg", algos_cont)
def test_continuous_identity(alg): def test_continuous_identity(alg):
''' '''

View File

@@ -4,7 +4,7 @@ import pytest
from baselines.common.tests.envs.mnist_env import MnistEnv from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.tests.util import simple_test from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
# TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem? # TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem?
# GitHub issue https://github.com/openai/baselines/issues/189 # GitHub issue https://github.com/openai/baselines/issues/189
@@ -28,7 +28,7 @@ learn_args = {
#tests pass, but are too slow on travis. Same algorithms are covered #tests pass, but are too slow on travis. Same algorithms are covered
# by other tests with less compute-hungry nn's and by benchmarks # by other tests with less compute-hungry nn's and by benchmarks
@pytest.mark.skip @pytest.mark.skip
@pytest.mark.slow @mark_slow
@pytest.mark.parametrize("alg", learn_args.keys()) @pytest.mark.parametrize("alg", learn_args.keys())
def test_mnist(alg): def test_mnist(alg):
''' '''

View File

@@ -0,0 +1,17 @@
# smoke tests of plot_util
from baselines.common import plot_util as pu
from baselines.common.tests.util import smoketest
def test_plot_util():
nruns = 4
logdirs = [smoketest('--alg=ppo2 --env=CartPole-v0 --num_timesteps=10000') for _ in range(nruns)]
data = pu.load_results(logdirs)
assert len(data) == 4
_, axes = pu.plot_results(data[:1]); assert len(axes) == 1
_, axes = pu.plot_results(data, tiling='vertical'); assert axes.shape==(4,1)
_, axes = pu.plot_results(data, tiling='horizontal'); assert axes.shape==(1,4)
_, axes = pu.plot_results(data, tiling='symmetric'); assert axes.shape==(2,2)
_, axes = pu.plot_results(data, split_fn=lambda _: ''); assert len(axes) == 1

View File

@@ -4,6 +4,7 @@ import subprocess
import cloudpickle import cloudpickle
import base64 import base64
import pytest import pytest
from functools import wraps
try: try:
from mpi4py import MPI from mpi4py import MPI
@@ -12,6 +13,7 @@ except ImportError:
def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True): def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True):
def outer_thunk(fn): def outer_thunk(fn):
@wraps(fn)
def thunk(*args, **kwargs): def thunk(*args, **kwargs):
serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs))) serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs)))
subprocess.check_call([ subprocess.check_call([

View File

@@ -77,3 +77,16 @@ def rollout(env, model, n_trials):
observations.append(episode_obs) observations.append(episode_obs)
return observations, actions, rewards return observations, actions, rewards
def smoketest(argstr, **kwargs):
import tempfile
import subprocess
import os
argstr = 'python -m baselines.run ' + argstr
for key, value in kwargs:
argstr += ' --{}={}'.format(key, value)
tempdir = tempfile.mkdtemp()
env = os.environ.copy()
env['OPENAI_LOGDIR'] = tempdir
subprocess.run(argstr.split(' '), env=env)
return tempdir

View File

@@ -33,7 +33,7 @@ class VecMonitor(VecEnvWrapper):
self.eprets += rews self.eprets += rews
self.eplens += 1 self.eplens += 1
newinfos = infos[:] newinfos = list(infos[:])
for i in range(len(dones)): for i in range(len(dones)):
if dones[i]: if dones[i]:
info = infos[i].copy() info = infos[i].copy()

View File

@@ -1,10 +1,6 @@
from multiprocessing import Process from baselines.common.tests.util import smoketest
import baselines.run
def _run(argstr): def _run(argstr):
p = Process(target=baselines.run.main, args=('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' ')) smoketest('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr)
p.start()
p.join()
def test_popart(): def test_popart():
_run('--normalize_returns=True --popart=True') _run('--normalize_returns=True --popart=True')

View File

@@ -38,8 +38,8 @@ class HumanOutputFormat(KVWriter, SeqWriter):
# Create strings for printing # Create strings for printing
key2str = {} key2str = {}
for (key, val) in sorted(kvs.items()): for (key, val) in sorted(kvs.items()):
if isinstance(val, float): if hasattr(val, '__float__'):
valstr = '%-8.3g' % (val,) valstr = '%-8.3g' % val
else: else:
valstr = str(val) valstr = str(val)
key2str[self._truncate(key)] = self._truncate(valstr) key2str[self._truncate(key)] = self._truncate(valstr)
@@ -92,7 +92,6 @@ class JSONOutputFormat(KVWriter):
def writekvs(self, kvs): def writekvs(self, kvs):
for k, v in sorted(kvs.items()): for k, v in sorted(kvs.items()):
if hasattr(v, 'dtype'): if hasattr(v, 'dtype'):
v = v.tolist()
kvs[k] = float(v) kvs[k] = float(v)
self.file.write(json.dumps(kvs) + '\n') self.file.write(json.dumps(kvs) + '\n')
self.file.flush() self.file.flush()
@@ -361,6 +360,15 @@ class Logger(object):
if isinstance(fmt, SeqWriter): if isinstance(fmt, SeqWriter):
fmt.writeseq(map(str, args)) fmt.writeseq(map(str, args))
def get_rank_without_mpi_import():
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
return int(os.environ[varname])
return 0
def configure(dir=None, format_strs=None, comm=None, log_suffix=''): def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
""" """
If comm is provided, average all numerical stats across that comm If comm is provided, average all numerical stats across that comm
@@ -373,12 +381,7 @@ def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
assert isinstance(dir, str) assert isinstance(dir, str)
os.makedirs(dir, exist_ok=True) os.makedirs(dir, exist_ok=True)
rank = 0 rank = get_rank_without_mpi_import()
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
rank = int(os.environ[varname])
if rank > 0: if rank > 0:
log_suffix = log_suffix + "-rank%03i" % rank log_suffix = log_suffix + "-rank%03i" % rank

View File

@@ -8,7 +8,7 @@ class MicrobatchedModel(Model):
on the entire minibatch causes some overflow on the entire minibatch causes some overflow
""" """
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size): nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight, comm, microbatch_size):
self.nmicrobatches = nbatch_train // microbatch_size self.nmicrobatches = nbatch_train // microbatch_size
self.microbatch_size = microbatch_size self.microbatch_size = microbatch_size
@@ -23,7 +23,9 @@ class MicrobatchedModel(Model):
nsteps=nsteps, nsteps=nsteps,
ent_coef=ent_coef, ent_coef=ent_coef,
vf_coef=vf_coef, vf_coef=vf_coef,
max_grad_norm=max_grad_norm) max_grad_norm=max_grad_norm,
mpi_rank_weight=mpi_rank_weight,
comm=comm)
self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads] self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads]
grads_ph_and_vars = list(zip(self.grads_ph, self.var)) grads_ph_and_vars = list(zip(self.grads_ph, self.var))

View File

@@ -25,9 +25,12 @@ class Model(object):
- Save load the model - Save load the model
""" """
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None): nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight=1, comm=None, microbatch_size=None):
self.sess = sess = get_session() self.sess = sess = get_session()
if MPI is not None and comm is None:
comm = MPI.COMM_WORLD
with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE): with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE):
# CREATE OUR TWO MODELS # CREATE OUR TWO MODELS
# act_model that is used for sampling # act_model that is used for sampling
@@ -91,8 +94,8 @@ class Model(object):
# 1. Get the model parameters # 1. Get the model parameters
params = tf.trainable_variables('ppo2_model') params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer # 2. Build our trainer
if MPI is not None: if comm is not None and comm.Get_size() > 1:
self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) self.trainer = MpiAdamOptimizer(comm, learning_rate=LR, mpi_rank_weight=mpi_rank_weight, epsilon=1e-5)
else: else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5) self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients # 3. Calculate the gradients
@@ -125,7 +128,7 @@ class Model(object):
initialize() initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
if MPI is not None: if MPI is not None:
sync_from_root(sess, global_variables) #pylint: disable=E1101 sync_from_root(sess, global_variables, comm=comm) #pylint: disable=E1101
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None): def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
# Here we calculate advantage A(s,a) = R + yV(s') - V(s) # Here we calculate advantage A(s,a) = R + yV(s') - V(s)

View File

@@ -21,7 +21,7 @@ def constfn(val):
def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4, def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2, log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=0, load_path=None, model_fn=None, **network_kwargs): save_interval=0, load_path=None, model_fn=None, update_fn=None, init_fn=None, mpi_rank_weight=1, comm=None, **network_kwargs):
''' '''
Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347) Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347)
@@ -97,6 +97,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
# Calculate the batch_size # Calculate the batch_size
nbatch = nenvs * nsteps nbatch = nenvs * nsteps
nbatch_train = nbatch // nminibatches nbatch_train = nbatch // nminibatches
is_mpi_root = (MPI is None or MPI.COMM_WORLD.Get_rank() == 0)
# Instantiate the model object (that creates act_model and train_model) # Instantiate the model object (that creates act_model and train_model)
if model_fn is None: if model_fn is None:
@@ -105,7 +106,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train, model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train,
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef,
max_grad_norm=max_grad_norm) max_grad_norm=max_grad_norm, comm=comm, mpi_rank_weight=mpi_rank_weight)
if load_path is not None: if load_path is not None:
model.load(load_path) model.load(load_path)
@@ -118,6 +119,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
if eval_env is not None: if eval_env is not None:
eval_epinfobuf = deque(maxlen=100) eval_epinfobuf = deque(maxlen=100)
if init_fn is not None:
init_fn()
# Start total timer # Start total timer
tfirststart = time.perf_counter() tfirststart = time.perf_counter()
@@ -131,11 +135,16 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
lrnow = lr(frac) lrnow = lr(frac)
# Calculate the cliprange # Calculate the cliprange
cliprangenow = cliprange(frac) cliprangenow = cliprange(frac)
if update % log_interval == 0 and is_mpi_root: logger.info('Stepping environment...')
# Get minibatch # Get minibatch
obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632 obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632
if eval_env is not None: if eval_env is not None:
eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632 eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632
if update % log_interval == 0 and is_mpi_root: logger.info('Done.')
epinfobuf.extend(epinfos) epinfobuf.extend(epinfos)
if eval_env is not None: if eval_env is not None:
eval_epinfobuf.extend(eval_epinfos) eval_epinfobuf.extend(eval_epinfos)
@@ -176,31 +185,36 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
tnow = time.perf_counter() tnow = time.perf_counter()
# Calculate the fps (frame per second) # Calculate the fps (frame per second)
fps = int(nbatch / (tnow - tstart)) fps = int(nbatch / (tnow - tstart))
if update_fn is not None:
update_fn(update)
if update % log_interval == 0 or update == 1: if update % log_interval == 0 or update == 1:
# Calculates if value function is a good predicator of the returns (ev > 1) # Calculates if value function is a good predicator of the returns (ev > 1)
# or if it's just worse than predicting nothing (ev =< 0) # or if it's just worse than predicting nothing (ev =< 0)
ev = explained_variance(values, returns) ev = explained_variance(values, returns)
logger.logkv("serial_timesteps", update*nsteps) logger.logkv("misc/serial_timesteps", update*nsteps)
logger.logkv("nupdates", update) logger.logkv("misc/nupdates", update)
logger.logkv("total_timesteps", update*nbatch) logger.logkv("misc/total_timesteps", update*nbatch)
logger.logkv("fps", fps) logger.logkv("fps", fps)
logger.logkv("explained_variance", float(ev)) logger.logkv("misc/explained_variance", float(ev))
logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf])) logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf])) logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf]))
if eval_env is not None: if eval_env is not None:
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) ) logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) )
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) ) logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) )
logger.logkv('time_elapsed', tnow - tfirststart) logger.logkv('misc/time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names): for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval) logger.logkv('loss/' + lossname, lossval)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs() logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0): if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and is_mpi_root:
checkdir = osp.join(logger.get_dir(), 'checkpoints') checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True) os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update) savepath = osp.join(checkdir, '%.5i'%update)
print('Saving to', savepath) print('Saving to', savepath)
model.save(savepath) model.save(savepath)
return model return model
# Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error) # Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error)
def safemean(xs): def safemean(xs):

View File

@@ -36,8 +36,6 @@ setup(name='baselines',
'scipy', 'scipy',
'tqdm', 'tqdm',
'joblib', 'joblib',
'dill',
'progressbar2',
'cloudpickle', 'cloudpickle',
'click', 'click',
'opencv-python' 'opencv-python'