From 5082e5d34b29e21c171cb372ffdbbe9d9e621232 Mon Sep 17 00:00:00 2001 From: Karl Cobbe Date: Thu, 4 Apr 2019 13:52:00 -0700 Subject: [PATCH] Workbench (#303) * begin workbench * cleanup * begin procgen config integration * arg tweaks * more args * parameter saving * begin procgen enjoy * tweaks * more workbench * more args sync/restore * cleanup * merge in master * rework args priority * more workbench * more loggign * impala cnn * impala lstm * tweak * tweaks * rl19 time logging * misc fixes * faster pipeline * update local.py * sess and log config tweaks * num processes * logging tweaks * difficulty reward wrapper * logging fixes * gin tweaks * tweak * fix * task id * param loading * more variable loading * entrypoint * tweak * ksync * restore lstm * begin rl19 support * tweak * rl19 rnn * more rl19 integration * fix * cleanup * restore rl19 rnn * cleanup * cleanup * wrappers.get_log_info * cleanup * cleanup * directory cleanup * logging, num_experiments * fixes * cleanup * gin fixes * fix local max gpu * resid nx * num machines and download params * rename * cleanup * create workbench * more reorg * fix * more logging wrappers * lint fix * restore train procgen * restore train procgen * pylint fix * better wrapping * config sweep * args sweep * test workers * mpi_weight * train test comm and high difficulty fix * enjoy show returns * removing gin, procgen_parser * removing gin * procgen args * config fixes * cleanup * cleanup * procgen args fix * fix * rcall syncing * lint * rename mpi_weight * use username for sync * fixes * microbatch fix --- baselines/common/models.py | 52 +++++++++++++++++++++++--- baselines/common/mpi_adam_optimizer.py | 13 +++++-- baselines/ppo2/microbatched_model.py | 5 ++- baselines/ppo2/model.py | 4 +- baselines/ppo2/ppo2.py | 28 +++++++++----- 5 files changed, 79 insertions(+), 23 deletions(-) diff --git a/baselines/common/models.py b/baselines/common/models.py index 0003079..0798916 100644 --- a/baselines/common/models.py +++ b/baselines/common/models.py @@ -3,7 +3,6 @@ import tensorflow as tf from baselines.a2c import utils from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch from baselines.common.mpi_running_mean_std import RunningMeanStd -import tensorflow.contrib.layers as layers mapping = {} @@ -26,6 +25,42 @@ def nature_cnn(unscaled_images, **conv_kwargs): h3 = conv_to_fc(h3) return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) +def build_impala_cnn(unscaled_images, depths=[16,32,32], **conv_kwargs): + """ + Model used in the paper "IMPALA: Scalable Distributed Deep-RL with + Importance Weighted Actor-Learner Architectures" https://arxiv.org/abs/1802.01561 + """ + def conv_layer(out, depth): + return tf.layers.conv2d(out, depth, 3, padding='same') + + def residual_block(inputs): + depth = inputs.get_shape()[-1].value + + out = tf.nn.relu(inputs) + + out = conv_layer(out, depth) + out = tf.nn.relu(out) + out = conv_layer(out, depth) + return out + inputs + + def conv_sequence(inputs, depth): + out = conv_layer(inputs, depth) + out = tf.layers.max_pooling2d(out, pool_size=3, strides=2, padding='same') + out = residual_block(out) + out = residual_block(out) + return out + + out = tf.cast(unscaled_images, tf.float32) / 255. + + for depth in depths: + out = conv_sequence(out, depth) + + out = tf.layers.flatten(out) + out = tf.nn.relu(out) + out = tf.layers.dense(out, 256, activation=tf.nn.relu) + + return out + @register("mlp") def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False): @@ -65,6 +100,11 @@ def cnn(**conv_kwargs): return nature_cnn(X, **conv_kwargs) return network_fn +@register("impala_cnn") +def impala_cnn(**conv_kwargs): + def network_fn(X): + return build_impala_cnn(X) + return network_fn @register("cnn_small") def cnn_small(**conv_kwargs): @@ -79,7 +119,6 @@ def cnn_small(**conv_kwargs): return h return network_fn - @register("lstm") def lstm(nlstm=128, layer_norm=False): """ @@ -136,12 +175,12 @@ def lstm(nlstm=128, layer_norm=False): @register("cnn_lstm") -def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): +def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs): def network_fn(X, nenv=1): nbatch = X.shape[0] nsteps = nbatch // nenv - h = nature_cnn(X, **conv_kwargs) + h = conv_fn(X, **conv_kwargs) M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states @@ -161,6 +200,9 @@ def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): return network_fn +@register("impala_cnn_lstm") +def impala_cnn_lstm(): + return cnn_lstm(nlstm=256, conv_fn=build_impala_cnn) @register("cnn_lnlstm") def cnn_lnlstm(nlstm=128, **conv_kwargs): @@ -187,7 +229,7 @@ def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs): out = tf.cast(X, tf.float32) / 255. with tf.variable_scope("convnet"): for num_outputs, kernel_size, stride in convs: - out = layers.convolution2d(out, + out = tf.contrib.layers.convolution2d(out, num_outputs=num_outputs, kernel_size=kernel_size, stride=stride, diff --git a/baselines/common/mpi_adam_optimizer.py b/baselines/common/mpi_adam_optimizer.py index db7f7a2..dcbcd74 100644 --- a/baselines/common/mpi_adam_optimizer.py +++ b/baselines/common/mpi_adam_optimizer.py @@ -9,22 +9,27 @@ except ImportError: class MpiAdamOptimizer(tf.train.AdamOptimizer): """Adam optimizer that averages gradients across mpi processes.""" - def __init__(self, comm, **kwargs): + def __init__(self, comm, mpi_rank_weight=1, **kwargs): self.comm = comm + self.mpi_rank_weight = mpi_rank_weight tf.train.AdamOptimizer.__init__(self, **kwargs) def compute_gradients(self, loss, var_list, **kwargs): grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs) grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None] - flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) + flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight shapes = [v.shape.as_list() for g, v in grads_and_vars] sizes = [int(np.prod(s)) for s in shapes] - num_tasks = self.comm.Get_size() + + total_weight = np.zeros(1, np.float32) + self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM) + total_weight = total_weight[0] + buf = np.zeros(sum(sizes), np.float32) countholder = [0] # Counts how many times _collect_grads has been called stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable def _collect_grads(flat_grad, np_stat): self.comm.Allreduce(flat_grad, buf, op=MPI.SUM) - np.divide(buf, float(num_tasks), out=buf) + np.divide(buf, float(total_weight), out=buf) if countholder[0] % 100 == 0: check_synced(np_stat, self.comm) countholder[0] += 1 diff --git a/baselines/ppo2/microbatched_model.py b/baselines/ppo2/microbatched_model.py index 6735ed4..8d8b688 100644 --- a/baselines/ppo2/microbatched_model.py +++ b/baselines/ppo2/microbatched_model.py @@ -8,7 +8,7 @@ class MicrobatchedModel(Model): on the entire minibatch causes some overflow """ def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, - nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size): + nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight, microbatch_size): self.nmicrobatches = nbatch_train // microbatch_size self.microbatch_size = microbatch_size @@ -23,7 +23,8 @@ class MicrobatchedModel(Model): nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, - max_grad_norm=max_grad_norm) + max_grad_norm=max_grad_norm, + mpi_rank_weight=mpi_rank_weight) self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads] grads_ph_and_vars = list(zip(self.grads_ph, self.var)) diff --git a/baselines/ppo2/model.py b/baselines/ppo2/model.py index 2326b46..9370d5c 100644 --- a/baselines/ppo2/model.py +++ b/baselines/ppo2/model.py @@ -25,7 +25,7 @@ class Model(object): - Save load the model """ def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, - nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None): + nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight=1, microbatch_size=None): self.sess = sess = get_session() with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE): @@ -92,7 +92,7 @@ class Model(object): params = tf.trainable_variables('ppo2_model') # 2. Build our trainer if MPI is not None: - self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) + self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, mpi_rank_weight=mpi_rank_weight, epsilon=1e-5) else: self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5) # 3. Calculate the gradients diff --git a/baselines/ppo2/ppo2.py b/baselines/ppo2/ppo2.py index 7f3d204..09bc933 100644 --- a/baselines/ppo2/ppo2.py +++ b/baselines/ppo2/ppo2.py @@ -21,7 +21,7 @@ def constfn(val): def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4, vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2, - save_interval=0, load_path=None, model_fn=None, **network_kwargs): + save_interval=0, load_path=None, model_fn=None, update_fn=None, init_fn=None, mpi_rank_weight=1, **network_kwargs): ''' Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347) @@ -105,7 +105,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train, nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, - max_grad_norm=max_grad_norm) + max_grad_norm=max_grad_norm, mpi_rank_weight=mpi_rank_weight) if load_path is not None: model.load(load_path) @@ -118,6 +118,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 if eval_env is not None: eval_epinfobuf = deque(maxlen=100) + if init_fn is not None: + init_fn() + # Start total timer tfirststart = time.perf_counter() @@ -176,31 +179,36 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2 tnow = time.perf_counter() # Calculate the fps (frame per second) fps = int(nbatch / (tnow - tstart)) + + if update_fn is not None: + update_fn(update) + if update % log_interval == 0 or update == 1: # Calculates if value function is a good predicator of the returns (ev > 1) # or if it's just worse than predicting nothing (ev =< 0) ev = explained_variance(values, returns) - logger.logkv("serial_timesteps", update*nsteps) - logger.logkv("nupdates", update) - logger.logkv("total_timesteps", update*nbatch) + logger.logkv("misc/serial_timesteps", update*nsteps) + logger.logkv("misc/nupdates", update) + logger.logkv("misc/total_timesteps", update*nbatch) logger.logkv("fps", fps) - logger.logkv("explained_variance", float(ev)) + logger.logkv("misc/explained_variance", float(ev)) logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf])) logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf])) if eval_env is not None: logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) ) logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) ) - logger.logkv('time_elapsed', tnow - tfirststart) + logger.logkv('misc/time_elapsed', tnow - tfirststart) for (lossval, lossname) in zip(lossvals, model.loss_names): - logger.logkv(lossname, lossval) - if MPI is None or MPI.COMM_WORLD.Get_rank() == 0: - logger.dumpkvs() + logger.logkv('loss/' + lossname, lossval) + + logger.dumpkvs() if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0): checkdir = osp.join(logger.get_dir(), 'checkpoints') os.makedirs(checkdir, exist_ok=True) savepath = osp.join(checkdir, '%.5i'%update) print('Saving to', savepath) model.save(savepath) + return model # Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error) def safemean(xs):