diff --git a/.travis.yml b/.travis.yml index 712fc84..c68bfc1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,4 +11,4 @@ install: script: - flake8 . --show-source --statistics - - docker run baselines-test pytest -v . + - docker run -e RUNSLOW=1 baselines-test pytest -v . diff --git a/baselines/acktr/acktr.py b/baselines/acktr/acktr.py index 85447e8..69011e6 100644 --- a/baselines/acktr/acktr.py +++ b/baselines/acktr/acktr.py @@ -92,7 +92,7 @@ class Model(object): self.initial_state = step_model.initial_state tf.global_variables_initializer().run(session=sess) -def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=1, nprocs=32, nsteps=20, +def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=100, nprocs=32, nsteps=20, ent_coef=0.01, vf_coef=0.5, vf_fisher_coef=1.0, lr=0.25, max_grad_norm=0.5, kfac_clip=0.001, save_interval=None, lrschedule='linear', load_path=None, is_async=True, **network_kwargs): set_global_seeds(seed) diff --git a/baselines/acktr/kfac.py b/baselines/acktr/kfac.py index fac84f0..3d4a8c2 100644 --- a/baselines/acktr/kfac.py +++ b/baselines/acktr/kfac.py @@ -11,7 +11,7 @@ KFAC_DEBUG = False class KfacOptimizer(): - + # note that KfacOptimizer will be truly synchronous (and thus deterministic) only if a single-threaded session is used def __init__(self, learning_rate=0.01, momentum=0.9, clip_kl=0.01, kfac_update=2, stats_accum_iter=60, full_stats_init=False, cold_iter=100, cold_lr=None, is_async=False, async_stats=False, epsilon=1e-2, stats_decay=0.95, blockdiag_bias=False, channel_fac=False, factored_damping=False, approxT2=False, use_float64=False, weight_decay_dict={},max_grad_norm=0.5): self.max_grad_norm = max_grad_norm self._lr = learning_rate diff --git a/baselines/common/atari_wrappers.py b/baselines/common/atari_wrappers.py index 585081f..fb768e8 100644 --- a/baselines/common/atari_wrappers.py +++ b/baselines/common/atari_wrappers.py @@ -130,27 +130,60 @@ class ClipRewardEnv(gym.RewardWrapper): """Bin reward to {+1, 0, -1} by its sign.""" return np.sign(reward) -class WarpFrame(gym.ObservationWrapper): - def __init__(self, env, width=84, height=84, grayscale=True): - """Warp frames to 84x84 as done in the Nature paper and later work.""" - gym.ObservationWrapper.__init__(self, env) - self.width = width - self.height = height - self.grayscale = grayscale - if self.grayscale: - self.observation_space = spaces.Box(low=0, high=255, - shape=(self.height, self.width, 1), dtype=np.uint8) - else: - self.observation_space = spaces.Box(low=0, high=255, - shape=(self.height, self.width, 3), dtype=np.uint8) - def observation(self, frame): - if self.grayscale: +class WarpFrame(gym.ObservationWrapper): + def __init__(self, env, width=84, height=84, grayscale=True, dict_space_key=None): + """ + Warp frames to 84x84 as done in the Nature paper and later work. + + If the environment uses dictionary observations, `dict_space_key` can be specified which indicates which + observation should be warped. + """ + super().__init__(env) + self._width = width + self._height = height + self._grayscale = grayscale + self._key = dict_space_key + if self._grayscale: + num_colors = 1 + else: + num_colors = 3 + + new_space = gym.spaces.Box( + low=0, + high=255, + shape=(self._height, self._width, num_colors), + dtype=np.uint8, + ) + if self._key is None: + original_space = self.observation_space + self.observation_space = new_space + else: + original_space = self.observation_space.spaces[self._key] + self.observation_space.spaces[self._key] = new_space + assert original_space.dtype == np.uint8 and len(original_space.shape) == 3 + + def observation(self, obs): + if self._key is None: + frame = obs + else: + frame = obs[self._key] + + if self._grayscale: frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) - frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA) - if self.grayscale: + frame = cv2.resize( + frame, (self._width, self._height), interpolation=cv2.INTER_AREA + ) + if self._grayscale: frame = np.expand_dims(frame, -1) - return frame + + if self._key is None: + obs = frame + else: + obs = obs.copy() + obs[self._key] = frame + return obs + class FrameStack(gym.Wrapper): def __init__(self, env, k): diff --git a/baselines/common/cmd_util.py b/baselines/common/cmd_util.py index 47e96ff..28cb0a7 100644 --- a/baselines/common/cmd_util.py +++ b/baselines/common/cmd_util.py @@ -17,21 +17,26 @@ from baselines.common.atari_wrappers import make_atari, wrap_deepmind from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv from baselines.common.vec_env.dummy_vec_env import DummyVecEnv from baselines.common import retro_wrappers +from baselines.common.wrappers import ClipActionsWrapper def make_vec_env(env_id, env_type, num_env, seed, wrapper_kwargs=None, + env_kwargs=None, start_index=0, reward_scale=1.0, flatten_dict_observations=True, - gamestate=None): + gamestate=None, + initializer=None, + force_dummy=False): """ Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo. """ wrapper_kwargs = wrapper_kwargs or {} + env_kwargs = env_kwargs or {} mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0 seed = seed + 10000 * mpi_rank if seed is not None else None logger_dir = logger.get_dir() - def make_thunk(rank): + def make_thunk(rank, initializer=None): return lambda: make_env( env_id=env_id, env_type=env_type, @@ -42,18 +47,30 @@ def make_vec_env(env_id, env_type, num_env, seed, gamestate=gamestate, flatten_dict_observations=flatten_dict_observations, wrapper_kwargs=wrapper_kwargs, - logger_dir=logger_dir + env_kwargs=env_kwargs, + logger_dir=logger_dir, + initializer=initializer ) set_global_seeds(seed) - if num_env > 1: - return SubprocVecEnv([make_thunk(i + start_index) for i in range(num_env)]) + if not force_dummy and num_env > 1: + return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)]) else: - return DummyVecEnv([make_thunk(start_index)]) + return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)]) -def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, logger_dir=None): +def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None): + if initializer is not None: + initializer(mpi_rank=mpi_rank, subrank=subrank) + wrapper_kwargs = wrapper_kwargs or {} + env_kwargs = env_kwargs or {} + if ':' in env_id: + import re + import importlib + module_name = re.sub(':.*','',env_id) + env_id = re.sub('.*:', '', env_id) + importlib.import_module(module_name) if env_type == 'atari': env = make_atari(env_id) elif env_type == 'retro': @@ -61,7 +78,7 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1. gamestate = gamestate or retro.State.DEFAULT env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate) else: - env = gym.make(env_id) + env = gym.make(env_id, **env_kwargs) if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict): keys = env.observation_space.spaces.keys() @@ -72,6 +89,7 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1. logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)), allow_early_resets=True) + if env_type == 'atari': env = wrap_deepmind(env, **wrapper_kwargs) elif env_type == 'retro': @@ -79,6 +97,9 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1. wrapper_kwargs['frame_stack'] = 1 env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs) + if isinstance(env.action_space, gym.spaces.Box): + env = ClipActionsWrapper(env) + if reward_scale != 1: env = retro_wrappers.RewardScaler(env, reward_scale) diff --git a/baselines/common/models.py b/baselines/common/models.py index 0003079..a6fe467 100644 --- a/baselines/common/models.py +++ b/baselines/common/models.py @@ -3,7 +3,6 @@ import tensorflow as tf from baselines.a2c import utils from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch from baselines.common.mpi_running_mean_std import RunningMeanStd -import tensorflow.contrib.layers as layers mapping = {} @@ -26,6 +25,51 @@ def nature_cnn(unscaled_images, **conv_kwargs): h3 = conv_to_fc(h3) return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) +def build_impala_cnn(unscaled_images, depths=[16,32,32], **conv_kwargs): + """ + Model used in the paper "IMPALA: Scalable Distributed Deep-RL with + Importance Weighted Actor-Learner Architectures" https://arxiv.org/abs/1802.01561 + """ + + layer_num = 0 + + def get_layer_num_str(): + nonlocal layer_num + num_str = str(layer_num) + layer_num += 1 + return num_str + + def conv_layer(out, depth): + return tf.layers.conv2d(out, depth, 3, padding='same', name='layer_' + get_layer_num_str()) + + def residual_block(inputs): + depth = inputs.get_shape()[-1].value + + out = tf.nn.relu(inputs) + + out = conv_layer(out, depth) + out = tf.nn.relu(out) + out = conv_layer(out, depth) + return out + inputs + + def conv_sequence(inputs, depth): + out = conv_layer(inputs, depth) + out = tf.layers.max_pooling2d(out, pool_size=3, strides=2, padding='same') + out = residual_block(out) + out = residual_block(out) + return out + + out = tf.cast(unscaled_images, tf.float32) / 255. + + for depth in depths: + out = conv_sequence(out, depth) + + out = tf.layers.flatten(out) + out = tf.nn.relu(out) + out = tf.layers.dense(out, 256, activation=tf.nn.relu, name='layer_' + get_layer_num_str()) + + return out + @register("mlp") def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False): @@ -65,6 +109,11 @@ def cnn(**conv_kwargs): return nature_cnn(X, **conv_kwargs) return network_fn +@register("impala_cnn") +def impala_cnn(**conv_kwargs): + def network_fn(X): + return build_impala_cnn(X) + return network_fn @register("cnn_small") def cnn_small(**conv_kwargs): @@ -79,7 +128,6 @@ def cnn_small(**conv_kwargs): return h return network_fn - @register("lstm") def lstm(nlstm=128, layer_norm=False): """ @@ -136,12 +184,12 @@ def lstm(nlstm=128, layer_norm=False): @register("cnn_lstm") -def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): +def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs): def network_fn(X, nenv=1): nbatch = X.shape[0] nsteps = nbatch // nenv - h = nature_cnn(X, **conv_kwargs) + h = conv_fn(X, **conv_kwargs) M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states @@ -161,6 +209,9 @@ def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): return network_fn +@register("impala_cnn_lstm") +def impala_cnn_lstm(): + return cnn_lstm(nlstm=256, conv_fn=build_impala_cnn) @register("cnn_lnlstm") def cnn_lnlstm(nlstm=128, **conv_kwargs): @@ -187,7 +238,7 @@ def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs): out = tf.cast(X, tf.float32) / 255. with tf.variable_scope("convnet"): for num_outputs, kernel_size, stride in convs: - out = layers.convolution2d(out, + out = tf.contrib.layers.convolution2d(out, num_outputs=num_outputs, kernel_size=kernel_size, stride=stride, diff --git a/baselines/common/mpi_adam_optimizer.py b/baselines/common/mpi_adam_optimizer.py index acff294..3d7cee5 100644 --- a/baselines/common/mpi_adam_optimizer.py +++ b/baselines/common/mpi_adam_optimizer.py @@ -2,6 +2,7 @@ import numpy as np import tensorflow as tf from baselines.common import tf_util as U from baselines.common.tests.test_with_mpi import with_mpi +from baselines import logger try: from mpi4py import MPI except ImportError: @@ -9,22 +10,34 @@ except ImportError: class MpiAdamOptimizer(tf.train.AdamOptimizer): """Adam optimizer that averages gradients across mpi processes.""" - def __init__(self, comm, **kwargs): + def __init__(self, comm, grad_clip=None, mpi_rank_weight=1, **kwargs): self.comm = comm + self.grad_clip = grad_clip + self.mpi_rank_weight = mpi_rank_weight tf.train.AdamOptimizer.__init__(self, **kwargs) def compute_gradients(self, loss, var_list, **kwargs): grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs) grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None] - flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) + flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight shapes = [v.shape.as_list() for g, v in grads_and_vars] sizes = [int(np.prod(s)) for s in shapes] - num_tasks = self.comm.Get_size() + + total_weight = np.zeros(1, np.float32) + self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM) + total_weight = total_weight[0] + buf = np.zeros(sum(sizes), np.float32) countholder = [0] # Counts how many times _collect_grads has been called stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable def _collect_grads(flat_grad, np_stat): + if self.grad_clip is not None: + gradnorm = np.linalg.norm(flat_grad) + if gradnorm > 1: + flat_grad /= gradnorm + logger.logkv_mean('gradnorm', gradnorm) + logger.logkv_mean('gradclipfrac', float(gradnorm > 1)) self.comm.Allreduce(flat_grad, buf, op=MPI.SUM) - np.divide(buf, float(num_tasks), out=buf) + np.divide(buf, float(total_weight), out=buf) if countholder[0] % 100 == 0: check_synced(np_stat, self.comm) countholder[0] += 1 @@ -51,8 +64,8 @@ def check_synced(localval, comm=None): comm = comm or MPI.COMM_WORLD vals = comm.gather(localval) if comm.rank == 0: - assert all(val==vals[0] for val in vals[1:]) - + assert all(val==vals[0] for val in vals[1:]),\ + f'MpiAdamOptimizer detected that different workers have different weights: {vals}' @with_mpi(timeout=5) def test_nonfreeze(): @@ -75,4 +88,3 @@ def test_nonfreeze(): l,_ = sess.run([loss, update_op]) print(i, l) losslist_ref.append(l) - diff --git a/baselines/common/plot_util.py b/baselines/common/plot_util.py index efd2e38..e15c508 100644 --- a/baselines/common/plot_util.py +++ b/baselines/common/plot_util.py @@ -90,6 +90,8 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_ sum_y *= interstep_decay count_y *= interstep_decay while True: + if luoi >= len(xolds): + break xold = xolds[luoi] if xold <= xnew: decay = np.exp(- (xnew - xold) / decay_period) @@ -98,8 +100,6 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_ luoi += 1 else: break - if luoi >= len(xolds): - break sum_ys[i] = sum_y count_ys[i] = count_y @@ -248,7 +248,10 @@ def plot_results( figsize=None, legend_outside=False, resample=0, - smooth_step=1.0 + smooth_step=1.0, + tiling='vertical', + xlabel=None, + ylabel=None ): ''' Plot multiple Results objects @@ -300,9 +303,23 @@ def plot_results( sk2r[splitkey].append(result) assert len(sk2r) > 0 assert isinstance(resample, int), "0: don't resample. : that many samples" - nrows = len(sk2r) - ncols = 1 - figsize = figsize or (6, 6 * nrows) + if tiling == 'vertical' or tiling is None: + nrows = len(sk2r) + ncols = 1 + elif tiling == 'horizontal': + ncols = len(sk2r) + nrows = 1 + elif tiling == 'symmetric': + import math + N = len(sk2r) + largest_divisor = 1 + for i in range(1, int(math.sqrt(N))+1): + if N % i == 0: + largest_divisor = i + ncols = largest_divisor + nrows = N // ncols + figsize = figsize or (6 * ncols, 6 * nrows) + f, axarr = plt.subplots(nrows, ncols, sharex=False, squeeze=False, figsize=figsize) groups = list(set(group_fn(result) for result in allresults)) @@ -316,7 +333,9 @@ def plot_results( g2c = defaultdict(int) sresults = sk2r[sk] gresults = defaultdict(list) - ax = axarr[isplit][0] + idx_row = isplit // ncols + idx_col = isplit % ncols + ax = axarr[idx_row][idx_col] for result in sresults: group = group_fn(result) g2c[group] += 1 @@ -355,7 +374,7 @@ def plot_results( ymean = np.mean(ys, axis=0) ystd = np.std(ys, axis=0) ystderr = ystd / np.sqrt(len(ys)) - l, = axarr[isplit][0].plot(usex, ymean, color=color) + l, = axarr[idx_row][idx_col].plot(usex, ymean, color=color) g2l[group] = l if shaded_err: ax.fill_between(usex, ymean - ystderr, ymean + ystderr, color=color, alpha=.4) @@ -372,6 +391,17 @@ def plot_results( loc=2 if legend_outside else None, bbox_to_anchor=(1,1) if legend_outside else None) ax.set_title(sk) + # add xlabels, but only to the bottom row + if xlabel is not None: + for ax in axarr[-1]: + plt.sca(ax) + plt.xlabel(xlabel) + # add ylabels, but only to left column + if ylabel is not None: + for ax in axarr[:,0]: + plt.sca(ax) + plt.ylabel(ylabel) + return f, axarr def regression_analysis(df): diff --git a/baselines/common/test_mpi_util.py b/baselines/common/test_mpi_util.py index b1146ab..8b94420 100644 --- a/baselines/common/test_mpi_util.py +++ b/baselines/common/test_mpi_util.py @@ -1,10 +1,13 @@ +from baselines.common import mpi_util from baselines import logger from baselines.common.tests.test_with_mpi import with_mpi -from baselines.common import mpi_util +try: + from mpi4py import MPI +except ImportError: + MPI = None @with_mpi() def test_mpi_weighted_mean(): - from mpi4py import MPI comm = MPI.COMM_WORLD with logger.scoped_configure(comm=comm): if comm.rank == 0: @@ -13,7 +16,6 @@ def test_mpi_weighted_mean(): name2valcount = {'a' : (19, 1), 'c' : (42,3)} else: raise NotImplementedError - d = mpi_util.mpi_weighted_mean(comm, name2valcount) correctval = {'a' : (10 * 2 + 19) / 3.0, 'b' : 20, 'c' : 42} if comm.rank == 0: diff --git a/baselines/common/tests/__init__.py b/baselines/common/tests/__init__.py index e69de29..a6561a2 100644 --- a/baselines/common/tests/__init__.py +++ b/baselines/common/tests/__init__.py @@ -0,0 +1,2 @@ +import os, pytest +mark_slow = pytest.mark.skipif(not os.getenv('RUNSLOW'), reason='slow') \ No newline at end of file diff --git a/baselines/common/tests/envs/fixed_sequence_env.py b/baselines/common/tests/envs/fixed_sequence_env.py index f5460d5..b3fe396 100644 --- a/baselines/common/tests/envs/fixed_sequence_env.py +++ b/baselines/common/tests/envs/fixed_sequence_env.py @@ -9,18 +9,16 @@ class FixedSequenceEnv(Env): n_actions=10, episode_len=100 ): - self.np_random = np.random.RandomState() - self.sequence = None - self.action_space = Discrete(n_actions) self.observation_space = Discrete(1) - + self.np_random = np.random.RandomState(0) self.episode_len = episode_len + self.sequence = [self.np_random.randint(0, self.action_space.n) + for _ in range(self.episode_len)] self.time = 0 + def reset(self): - if self.sequence is None: - self.sequence = [self.np_random.randint(0, self.action_space.n-1) for _ in range(self.episode_len)] self.time = 0 return 0 @@ -29,7 +27,6 @@ class FixedSequenceEnv(Env): self._choose_next_state() done = False if self.episode_len and self.time >= self.episode_len: - rew = 0 done = True return 0, rew, done, {} diff --git a/baselines/common/tests/envs/identity_env.py b/baselines/common/tests/envs/identity_env.py index 79e6c48..fb2dca6 100644 --- a/baselines/common/tests/envs/identity_env.py +++ b/baselines/common/tests/envs/identity_env.py @@ -2,43 +2,45 @@ import numpy as np from abc import abstractmethod from gym import Env from gym.spaces import MultiDiscrete, Discrete, Box - +from collections import deque class IdentityEnv(Env): def __init__( self, - episode_len=None + episode_len=None, + delay=0, + zero_first_rewards=True ): self.observation_space = self.action_space self.episode_len = episode_len self.time = 0 - self.reset() + self.delay = delay + self.zero_first_rewards = zero_first_rewards + self.q = deque(maxlen=delay+1) def reset(self): - self._choose_next_state() + self.q.clear() + for _ in range(self.delay + 1): + self.q.append(self.action_space.sample()) self.time = 0 - return self.state + return self.q[-1] def step(self, actions): - rew = self._get_reward(actions) - self._choose_next_state() - done = False - if self.episode_len and self.time >= self.episode_len: - done = True - - return self.state, rew, done, {} + rew = self._get_reward(self.q.popleft(), actions) + if self.zero_first_rewards and self.time < self.delay: + rew = 0 + self.q.append(self.action_space.sample()) + self.time += 1 + done = self.episode_len is not None and self.time >= self.episode_len + return self.q[-1], rew, done, {} def seed(self, seed=None): self.action_space.seed(seed) - def _choose_next_state(self): - self.state = self.action_space.sample() - self.time += 1 - @abstractmethod - def _get_reward(self, actions): + def _get_reward(self, state, actions): raise NotImplementedError @@ -47,26 +49,29 @@ class DiscreteIdentityEnv(IdentityEnv): self, dim, episode_len=None, + delay=0, + zero_first_rewards=True ): self.action_space = Discrete(dim) - super().__init__(episode_len=episode_len) + super().__init__(episode_len=episode_len, delay=delay, zero_first_rewards=zero_first_rewards) - def _get_reward(self, actions): - return 1 if self.state == actions else 0 + def _get_reward(self, state, actions): + return 1 if state == actions else 0 class MultiDiscreteIdentityEnv(IdentityEnv): def __init__( self, dims, episode_len=None, + delay=0, ): self.action_space = MultiDiscrete(dims) - super().__init__(episode_len=episode_len) + super().__init__(episode_len=episode_len, delay=delay) - def _get_reward(self, actions): - return 1 if all(self.state == actions) else 0 + def _get_reward(self, state, actions): + return 1 if all(state == actions) else 0 class BoxIdentityEnv(IdentityEnv): @@ -79,7 +84,7 @@ class BoxIdentityEnv(IdentityEnv): self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32) super().__init__(episode_len=episode_len) - def _get_reward(self, actions): - diff = actions - self.state + def _get_reward(self, state, actions): + diff = actions - state diff = diff[:] return -0.5 * np.dot(diff, diff) diff --git a/baselines/common/tests/envs/identity_env_test.py b/baselines/common/tests/envs/identity_env_test.py new file mode 100644 index 0000000..c73ee57 --- /dev/null +++ b/baselines/common/tests/envs/identity_env_test.py @@ -0,0 +1,36 @@ +from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv + + +def test_discrete_nodelay(): + nsteps = 100 + eplen = 50 + env = DiscreteIdentityEnv(10, episode_len=eplen) + ob = env.reset() + for t in range(nsteps): + action = env.action_space.sample() + next_ob, rew, done, info = env.step(action) + assert rew == (1 if action == ob else 0) + if (t + 1) % eplen == 0: + assert done + next_ob = env.reset() + else: + assert not done + ob = next_ob + +def test_discrete_delay1(): + eplen = 50 + env = DiscreteIdentityEnv(10, episode_len=eplen, delay=1) + ob = env.reset() + prev_ob = None + for t in range(eplen): + action = env.action_space.sample() + next_ob, rew, done, info = env.step(action) + if t > 0: + assert rew == (1 if action == prev_ob else 0) + else: + assert rew == 0 + prev_ob = ob + ob = next_ob + if t < eplen - 1: + assert not done + assert done diff --git a/baselines/common/tests/test_cartpole.py b/baselines/common/tests/test_cartpole.py index 475ad1d..f9d5ac6 100644 --- a/baselines/common/tests/test_cartpole.py +++ b/baselines/common/tests/test_cartpole.py @@ -3,6 +3,7 @@ import gym from baselines.run import get_learn_function from baselines.common.tests.util import reward_per_episode_test +from baselines.common.tests import mark_slow common_kwargs = dict( total_timesteps=30000, @@ -20,7 +21,7 @@ learn_kwargs = { 'trpo_mpi': {} } -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", learn_kwargs.keys()) def test_cartpole(alg): ''' diff --git a/baselines/common/tests/test_fetchreach.py b/baselines/common/tests/test_fetchreach.py index be73663..8bcd32b 100644 --- a/baselines/common/tests/test_fetchreach.py +++ b/baselines/common/tests/test_fetchreach.py @@ -3,6 +3,7 @@ import gym from baselines.run import get_learn_function from baselines.common.tests.util import reward_per_episode_test +from baselines.common.tests import mark_slow pytest.importorskip('mujoco_py') @@ -15,7 +16,7 @@ learn_kwargs = { 'her': dict(total_timesteps=2000) } -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", learn_kwargs.keys()) def test_fetchreach(alg): ''' diff --git a/baselines/common/tests/test_fixed_sequence.py b/baselines/common/tests/test_fixed_sequence.py index 061c375..68ee8d3 100644 --- a/baselines/common/tests/test_fixed_sequence.py +++ b/baselines/common/tests/test_fixed_sequence.py @@ -3,6 +3,8 @@ from baselines.common.tests.envs.fixed_sequence_env import FixedSequenceEnv from baselines.common.tests.util import simple_test from baselines.run import get_learn_function +from baselines.common.tests import mark_slow + common_kwargs = dict( seed=0, @@ -21,7 +23,7 @@ learn_kwargs = { alg_list = learn_kwargs.keys() rnn_list = ['lstm'] -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", alg_list) @pytest.mark.parametrize("rnn", rnn_list) def test_fixed_sequence(alg, rnn): diff --git a/baselines/common/tests/test_identity.py b/baselines/common/tests/test_identity.py index c950e5a..6b66a66 100644 --- a/baselines/common/tests/test_identity.py +++ b/baselines/common/tests/test_identity.py @@ -2,6 +2,7 @@ import pytest from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv, MultiDiscreteIdentityEnv from baselines.run import get_learn_function from baselines.common.tests.util import simple_test +from baselines.common.tests import mark_slow common_kwargs = dict( total_timesteps=30000, @@ -24,7 +25,7 @@ algos_disc = ['a2c', 'acktr', 'deepq', 'ppo2', 'trpo_mpi'] algos_multidisc = ['a2c', 'acktr', 'ppo2', 'trpo_mpi'] algos_cont = ['a2c', 'acktr', 'ddpg', 'ppo2', 'trpo_mpi'] -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", algos_disc) def test_discrete_identity(alg): ''' @@ -39,7 +40,7 @@ def test_discrete_identity(alg): env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100) simple_test(env_fn, learn_fn, 0.9) -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", algos_multidisc) def test_multidiscrete_identity(alg): ''' @@ -54,7 +55,7 @@ def test_multidiscrete_identity(alg): env_fn = lambda: MultiDiscreteIdentityEnv((3,3), episode_len=100) simple_test(env_fn, learn_fn, 0.9) -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", algos_cont) def test_continuous_identity(alg): ''' diff --git a/baselines/common/tests/test_mnist.py b/baselines/common/tests/test_mnist.py index bacc914..06a4e2b 100644 --- a/baselines/common/tests/test_mnist.py +++ b/baselines/common/tests/test_mnist.py @@ -4,7 +4,7 @@ import pytest from baselines.common.tests.envs.mnist_env import MnistEnv from baselines.common.tests.util import simple_test from baselines.run import get_learn_function - +from baselines.common.tests import mark_slow # TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem? # GitHub issue https://github.com/openai/baselines/issues/189 @@ -28,7 +28,7 @@ learn_args = { #tests pass, but are too slow on travis. Same algorithms are covered # by other tests with less compute-hungry nn's and by benchmarks @pytest.mark.skip -@pytest.mark.slow +@mark_slow @pytest.mark.parametrize("alg", learn_args.keys()) def test_mnist(alg): ''' diff --git a/baselines/common/tests/test_plot_util.py b/baselines/common/tests/test_plot_util.py new file mode 100644 index 0000000..be33308 --- /dev/null +++ b/baselines/common/tests/test_plot_util.py @@ -0,0 +1,17 @@ +# smoke tests of plot_util +from baselines.common import plot_util as pu +from baselines.common.tests.util import smoketest + + +def test_plot_util(): + nruns = 4 + logdirs = [smoketest('--alg=ppo2 --env=CartPole-v0 --num_timesteps=10000') for _ in range(nruns)] + data = pu.load_results(logdirs) + assert len(data) == 4 + + _, axes = pu.plot_results(data[:1]); assert len(axes) == 1 + _, axes = pu.plot_results(data, tiling='vertical'); assert axes.shape==(4,1) + _, axes = pu.plot_results(data, tiling='horizontal'); assert axes.shape==(1,4) + _, axes = pu.plot_results(data, tiling='symmetric'); assert axes.shape==(2,2) + _, axes = pu.plot_results(data, split_fn=lambda _: ''); assert len(axes) == 1 + diff --git a/baselines/common/tests/test_with_mpi.py b/baselines/common/tests/test_with_mpi.py index 86be475..9388078 100644 --- a/baselines/common/tests/test_with_mpi.py +++ b/baselines/common/tests/test_with_mpi.py @@ -4,6 +4,7 @@ import subprocess import cloudpickle import base64 import pytest +from functools import wraps try: from mpi4py import MPI @@ -12,6 +13,7 @@ except ImportError: def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True): def outer_thunk(fn): + @wraps(fn) def thunk(*args, **kwargs): serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs))) subprocess.check_call([ diff --git a/baselines/common/tests/util.py b/baselines/common/tests/util.py index 38ea4dc..b3d31fe 100644 --- a/baselines/common/tests/util.py +++ b/baselines/common/tests/util.py @@ -5,6 +5,12 @@ from baselines.common.vec_env.dummy_vec_env import DummyVecEnv N_TRIALS = 10000 N_EPISODES = 100 +_sess_config = tf.ConfigProto( + allow_soft_placement=True, + intra_op_parallelism_threads=1, + inter_op_parallelism_threads=1 +) + def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS): def seeded_env_fn(): env = env_fn() @@ -13,7 +19,7 @@ def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS): np.random.seed(0) env = DummyVecEnv([seeded_env_fn]) - with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default(): + with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default(): tf.set_random_seed(0) model = learn_fn(env) sum_rew = 0 @@ -34,7 +40,7 @@ def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS): def reward_per_episode_test(env_fn, learn_fn, min_avg_reward, n_trials=N_EPISODES): env = DummyVecEnv([env_fn]) - with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default(): + with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default(): model = learn_fn(env) N_TRIALS = 100 observations, actions, rewards = rollout(env, model, N_TRIALS) @@ -71,3 +77,16 @@ def rollout(env, model, n_trials): observations.append(episode_obs) return observations, actions, rewards + +def smoketest(argstr, **kwargs): + import tempfile + import subprocess + import os + argstr = 'python -m baselines.run ' + argstr + for key, value in kwargs: + argstr += ' --{}={}'.format(key, value) + tempdir = tempfile.mkdtemp() + env = os.environ.copy() + env['OPENAI_LOGDIR'] = tempdir + subprocess.run(argstr.split(' '), env=env) + return tempdir diff --git a/baselines/common/vec_env/vec_env.py b/baselines/common/vec_env/vec_env.py index 7aa7878..fc6098e 100644 --- a/baselines/common/vec_env/vec_env.py +++ b/baselines/common/vec_env/vec_env.py @@ -145,8 +145,7 @@ class VecEnvWrapper(VecEnv): def __init__(self, venv, observation_space=None, action_space=None): self.venv = venv - VecEnv.__init__(self, - num_envs=venv.num_envs, + super().__init__(num_envs=venv.num_envs, observation_space=observation_space or venv.observation_space, action_space=action_space or venv.action_space) @@ -170,6 +169,11 @@ class VecEnvWrapper(VecEnv): def get_images(self): return self.venv.get_images() + def __getattr__(self, name): + if name.startswith('_'): + raise AttributeError("attempted to get missing private attribute '{}'".format(name)) + return getattr(self.venv, name) + class VecEnvObservationWrapper(VecEnvWrapper): @abstractmethod def process(self, obs): diff --git a/baselines/common/vec_env/vec_monitor.py b/baselines/common/vec_env/vec_monitor.py index 6e67378..efaafc9 100644 --- a/baselines/common/vec_env/vec_monitor.py +++ b/baselines/common/vec_env/vec_monitor.py @@ -5,16 +5,18 @@ import time from collections import deque class VecMonitor(VecEnvWrapper): - def __init__(self, venv, filename=None, keep_buf=0): + def __init__(self, venv, filename=None, keep_buf=0, info_keywords=()): VecEnvWrapper.__init__(self, venv) self.eprets = None self.eplens = None self.epcount = 0 self.tstart = time.time() if filename: - self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart}) + self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart}, + extra_keys=info_keywords) else: self.results_writer = None + self.info_keywords = info_keywords self.keep_buf = keep_buf if self.keep_buf: self.epret_buf = deque([], maxlen=keep_buf) @@ -30,11 +32,16 @@ class VecMonitor(VecEnvWrapper): obs, rews, dones, infos = self.venv.step_wait() self.eprets += rews self.eplens += 1 - newinfos = [] - for (i, (done, ret, eplen, info)) in enumerate(zip(dones, self.eprets, self.eplens, infos)): - info = info.copy() - if done: + + newinfos = list(infos[:]) + for i in range(len(dones)): + if dones[i]: + info = infos[i].copy() + ret = self.eprets[i] + eplen = self.eplens[i] epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)} + for k in self.info_keywords: + epinfo[k] = info[k] info['episode'] = epinfo if self.keep_buf: self.epret_buf.append(ret) @@ -44,6 +51,5 @@ class VecMonitor(VecEnvWrapper): self.eplens[i] = 0 if self.results_writer: self.results_writer.write_row(epinfo) - newinfos.append(info) - + newinfos[i] = info return obs, rews, dones, newinfos diff --git a/baselines/common/vec_env/vec_normalize.py b/baselines/common/vec_env/vec_normalize.py index 40bd04b..51a4515 100644 --- a/baselines/common/vec_env/vec_normalize.py +++ b/baselines/common/vec_env/vec_normalize.py @@ -1,5 +1,4 @@ from . import VecEnvWrapper -from baselines.common.running_mean_std import TfRunningMeanStd, RunningMeanStd import numpy as np class VecNormalize(VecEnvWrapper): @@ -11,9 +10,11 @@ class VecNormalize(VecEnvWrapper): def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8, use_tf=False): VecEnvWrapper.__init__(self, venv) if use_tf: + from baselines.common.running_mean_std import TfRunningMeanStd self.ob_rms = TfRunningMeanStd(shape=self.observation_space.shape, scope='ob_rms') if ob else None self.ret_rms = TfRunningMeanStd(shape=(), scope='ret_rms') if ret else None else: + from baselines.common.running_mean_std import RunningMeanStd self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None self.ret_rms = RunningMeanStd(shape=()) if ret else None self.clipob = clipob diff --git a/baselines/common/vec_env/vec_remove_dict_obs.py b/baselines/common/vec_env/vec_remove_dict_obs.py index 602b949..a6c4656 100644 --- a/baselines/common/vec_env/vec_remove_dict_obs.py +++ b/baselines/common/vec_env/vec_remove_dict_obs.py @@ -1,6 +1,5 @@ from .vec_env import VecEnvObservationWrapper - class VecExtractDictObs(VecEnvObservationWrapper): def __init__(self, venv, key): self.key = key @@ -8,4 +7,4 @@ class VecExtractDictObs(VecEnvObservationWrapper): observation_space=venv.observation_space.spaces[self.key]) def process(self, obs): - return obs[self.key] \ No newline at end of file + return obs[self.key] diff --git a/baselines/common/wrappers.py b/baselines/common/wrappers.py index 7683d18..e5e93b0 100644 --- a/baselines/common/wrappers.py +++ b/baselines/common/wrappers.py @@ -16,4 +16,14 @@ class TimeLimit(gym.Wrapper): def reset(self, **kwargs): self._elapsed_steps = 0 - return self.env.reset(**kwargs) \ No newline at end of file + return self.env.reset(**kwargs) + +class ClipActionsWrapper(gym.Wrapper): + def step(self, action): + import numpy as np + action = np.nan_to_num(action) + action = np.clip(action, self.action_space.low, self.action_space.high) + return self.env.step(action) + + def reset(self, **kwargs): + return self.env.reset(**kwargs) diff --git a/baselines/ddpg/ddpg.py b/baselines/ddpg/ddpg.py index 37551d4..dd53ce1 100755 --- a/baselines/ddpg/ddpg.py +++ b/baselines/ddpg/ddpg.py @@ -217,7 +217,9 @@ def learn(network, env, stats = agent.get_stats() combined_stats = stats.copy() combined_stats['rollout/return'] = np.mean(epoch_episode_rewards) + combined_stats['rollout/return_std'] = np.std(epoch_episode_rewards) combined_stats['rollout/return_history'] = np.mean(episode_rewards_history) + combined_stats['rollout/return_history_std'] = np.std(episode_rewards_history) combined_stats['rollout/episode_steps'] = np.mean(epoch_episode_steps) combined_stats['rollout/actions_mean'] = np.mean(epoch_actions) combined_stats['rollout/Q_mean'] = np.mean(epoch_qs) diff --git a/baselines/ddpg/test_smoke.py b/baselines/ddpg/test_smoke.py index a9fdc05..bd7eba6 100644 --- a/baselines/ddpg/test_smoke.py +++ b/baselines/ddpg/test_smoke.py @@ -1,10 +1,6 @@ -from multiprocessing import Process -import baselines.run - +from baselines.common.tests.util import smoketest def _run(argstr): - p = Process(target=baselines.run.main, args=('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' ')) - p.start() - p.join() + smoketest('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr) def test_popart(): _run('--normalize_returns=True --popart=True') diff --git a/baselines/logger.py b/baselines/logger.py index 6c08ca0..36b0c98 100644 --- a/baselines/logger.py +++ b/baselines/logger.py @@ -38,8 +38,8 @@ class HumanOutputFormat(KVWriter, SeqWriter): # Create strings for printing key2str = {} for (key, val) in sorted(kvs.items()): - if isinstance(val, float): - valstr = '%-8.3g' % (val,) + if hasattr(val, '__float__'): + valstr = '%-8.3g' % val else: valstr = str(val) key2str[self._truncate(key)] = self._truncate(valstr) @@ -92,7 +92,6 @@ class JSONOutputFormat(KVWriter): def writekvs(self, kvs): for k, v in sorted(kvs.items()): if hasattr(v, 'dtype'): - v = v.tolist() kvs[k] = float(v) self.file.write(json.dumps(kvs) + '\n') self.file.flush() @@ -361,7 +360,16 @@ class Logger(object): if isinstance(fmt, SeqWriter): fmt.writeseq(map(str, args)) -def configure(dir=None, format_strs=None, comm=None): +def get_rank_without_mpi_import(): + # check environment variables here instead of importing mpi4py + # to avoid calling MPI_Init() when this module is imported + for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']: + if varname in os.environ: + return int(os.environ[varname]) + return 0 + + +def configure(dir=None, format_strs=None, comm=None, log_suffix=''): """ If comm is provided, average all numerical stats across that comm """ @@ -373,15 +381,9 @@ def configure(dir=None, format_strs=None, comm=None): assert isinstance(dir, str) os.makedirs(dir, exist_ok=True) - log_suffix = '' - rank = 0 - # check environment variables here instead of importing mpi4py - # to avoid calling MPI_Init() when this module is imported - for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']: - if varname in os.environ: - rank = int(os.environ[varname]) + rank = get_rank_without_mpi_import() if rank > 0: - log_suffix = "-rank%03i" % rank + log_suffix = log_suffix + "-rank%03i" % rank if format_strs is None: if rank == 0: diff --git a/baselines/ppo2/microbatched_model.py b/baselines/ppo2/microbatched_model.py index 6735ed4..a35b830 100644 --- a/baselines/ppo2/microbatched_model.py +++ b/baselines/ppo2/microbatched_model.py @@ -8,7 +8,7 @@ class MicrobatchedModel(Model): on the entire minibatch causes some overflow """ def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, - nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size): + nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight, comm, microbatch_size): self.nmicrobatches = nbatch_train // microbatch_size self.microbatch_size = microbatch_size @@ -23,7 +23,9 @@ class MicrobatchedModel(Model): nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, - max_grad_norm=max_grad_norm) + max_grad_norm=max_grad_norm, + mpi_rank_weight=mpi_rank_weight, + comm=comm) self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads] grads_ph_and_vars = list(zip(self.grads_ph, self.var)) diff --git a/baselines/ppo2/model.py b/baselines/ppo2/model.py index 2326b46..3d56bc9 100644 --- a/baselines/ppo2/model.py +++ b/baselines/ppo2/model.py @@ -25,9 +25,12 @@ class Model(object): - Save load the model """ def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, - nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None): + nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight=1, comm=None, microbatch_size=None): self.sess = sess = get_session() + if MPI is not None and comm is None: + comm = MPI.COMM_WORLD + with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE): # CREATE OUR TWO MODELS # act_model that is used for sampling @@ -91,8 +94,8 @@ class Model(object): # 1. Get the model parameters params = tf.trainable_variables('ppo2_model') # 2. Build our trainer - if MPI is not None: - self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) + if comm is not None and comm.Get_size() > 1: + self.trainer = MpiAdamOptimizer(comm, learning_rate=LR, mpi_rank_weight=mpi_rank_weight, epsilon=1e-5) else: self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5) # 3. Calculate the gradients @@ -125,7 +128,7 @@ class Model(object): initialize() global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") if MPI is not None: - sync_from_root(sess, global_variables) #pylint: disable=E1101 + sync_from_root(sess, global_variables, comm=comm) #pylint: disable=E1101 def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None): # Here we calculate advantage A(s,a) = R + yV(s') - V(s) diff --git a/baselines/ppo2/ppo2.py b/baselines/ppo2/ppo2.py index 7f3d204..d307e9b 100644 --- a/baselines/ppo2/ppo2.py +++ b/baselines/ppo2/ppo2.py @@ -21,7 +21,7 @@ def constfn(val): def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4, vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2, - save_interval=0, load_path=None, model_fn=None, **network_kwargs): + save_interval=0, load_path=None, model_fn=None, update_fn=None, init_fn=None, mpi_rank_weight=1, comm=None, **network_kwargs): ''' Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347) @@ -97,6 +97,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 # Calculate the batch_size nbatch = nenvs * nsteps nbatch_train = nbatch // nminibatches + is_mpi_root = (MPI is None or MPI.COMM_WORLD.Get_rank() == 0) # Instantiate the model object (that creates act_model and train_model) if model_fn is None: @@ -105,7 +106,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train, nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, - max_grad_norm=max_grad_norm) + max_grad_norm=max_grad_norm, comm=comm, mpi_rank_weight=mpi_rank_weight) if load_path is not None: model.load(load_path) @@ -118,6 +119,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 if eval_env is not None: eval_epinfobuf = deque(maxlen=100) + if init_fn is not None: + init_fn() + # Start total timer tfirststart = time.perf_counter() @@ -131,11 +135,16 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 lrnow = lr(frac) # Calculate the cliprange cliprangenow = cliprange(frac) + + if update % log_interval == 0 and is_mpi_root: logger.info('Stepping environment...') + # Get minibatch obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632 if eval_env is not None: eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632 + if update % log_interval == 0 and is_mpi_root: logger.info('Done.') + epinfobuf.extend(epinfos) if eval_env is not None: eval_epinfobuf.extend(eval_epinfos) @@ -176,31 +185,36 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 tnow = time.perf_counter() # Calculate the fps (frame per second) fps = int(nbatch / (tnow - tstart)) + + if update_fn is not None: + update_fn(update) + if update % log_interval == 0 or update == 1: # Calculates if value function is a good predicator of the returns (ev > 1) # or if it's just worse than predicting nothing (ev =< 0) ev = explained_variance(values, returns) - logger.logkv("serial_timesteps", update*nsteps) - logger.logkv("nupdates", update) - logger.logkv("total_timesteps", update*nbatch) + logger.logkv("misc/serial_timesteps", update*nsteps) + logger.logkv("misc/nupdates", update) + logger.logkv("misc/total_timesteps", update*nbatch) logger.logkv("fps", fps) - logger.logkv("explained_variance", float(ev)) + logger.logkv("misc/explained_variance", float(ev)) logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf])) logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf])) if eval_env is not None: logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) ) logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) ) - logger.logkv('time_elapsed', tnow - tfirststart) + logger.logkv('misc/time_elapsed', tnow - tfirststart) for (lossval, lossname) in zip(lossvals, model.loss_names): - logger.logkv(lossname, lossval) - if MPI is None or MPI.COMM_WORLD.Get_rank() == 0: - logger.dumpkvs() - if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0): + logger.logkv('loss/' + lossname, lossval) + + logger.dumpkvs() + if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and is_mpi_root: checkdir = osp.join(logger.get_dir(), 'checkpoints') os.makedirs(checkdir, exist_ok=True) savepath = osp.join(checkdir, '%.5i'%update) print('Saving to', savepath) model.save(savepath) + return model # Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error) def safemean(xs): diff --git a/setup.py b/setup.py index 894f302..448c875 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,9 @@ extras = { 'filelock', 'pytest', 'pytest-forked', - 'atari-py' + 'atari-py', + 'matplotlib', + 'pandas' ], 'mpi': [ 'mpi4py' @@ -33,8 +35,6 @@ setup(name='baselines', 'scipy', 'tqdm', 'joblib', - 'dill', - 'progressbar2', 'cloudpickle', 'click', 'opencv-python'