diff --git a/.gitignore b/.gitignore index 722e942..a41103d 100644 --- a/.gitignore +++ b/.gitignore @@ -34,5 +34,3 @@ src .cache MUJOCO_LOG.TXT - - diff --git a/.travis.yml b/.travis.yml index 5ba3ead..773cb2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,5 +10,5 @@ install: - docker build . -t baselines-test script: - - flake8 --select=F baselines/common + - flake8 --select=F,E999 baselines/common baselines/trpo_mpi baselines/ppo2 baselines/a2c baselines/deepq baselines/acer - docker run baselines-test pytest diff --git a/Dockerfile b/Dockerfile index eeac22a..c16646b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,17 +4,21 @@ RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libo ENV CODE_DIR /root/code ENV VENV /root/venv -COPY . $CODE_DIR/baselines RUN \ pip install virtualenv && \ virtualenv $VENV --python=python3 && \ . $VENV/bin/activate && \ - cd $CODE_DIR && \ - pip install --upgrade pip && \ - pip install -e baselines && \ - pip install pytest + pip install --upgrade pip ENV PATH=$VENV/bin:$PATH + +COPY . $CODE_DIR/baselines WORKDIR $CODE_DIR/baselines +# Clean up pycache and pyc files +RUN rm -rf __pycache__ && \ + find . -name "*.pyc" -delete && \ + pip install -e .[test] + + CMD /bin/bash diff --git a/README.md b/README.md index 197f01a..47e6e93 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,58 @@ pip install pytest pytest ``` +## Subpackages + +## Testing the installation +All unit tests in baselines can be run using pytest runner: +``` +pip install pytest +pytest +``` + +## Training models +Most of the algorithms in baselines repo are used as follows: +```bash + python -m baselines.common.run --alg= --env= [additional arguments] +``` +### Example 1. PPO with MuJoCo Humanoid +For instance, to train a fully-connected network controlling MuJoCo humanoid using ppo2 for 20M timesteps +```bash + python -m baselines.common.run --alg=ppo2 --env=Humanoid-v2 --network=mlp --num-timesteps=2e7 +``` +Note that for mujoco environments fully-connected network is default, so we can omit `--network=mlp` +The hyperparameters for both network and the learning algorithm can be controlled via the command line, for instance: +```bash + python -m baselines.common.run --alg=ppo2 --env=Humanoid-v2 --network=mlp --num-timesteps=2e7 --ent_coef=0.1 --num_hidden=32 --num_layers=3 +``` +will set entropy coeffient to 0.1, and construct fully connected network with 3 layers with 32 hidden units in each. +See docstrings in [common/models.py](common/models.py) for description of network parameters for each type of model, and +docstring for [baselines/ppo2/ppo2.py/learn()](ppo2/ppo2.py) fir the description of the ppo2 hyperparamters. + +### Example 2. DQN on Atari +DQN with Atari is at this point a classics of benchmarks. To run the baselines implementation of DQN on Atari Pong: +``` + python -m baselines.common.run --alg=deepq --env=PongNoFrameskip-v4 --num-timesteps=10000000 +``` + +## Saving, loading and visualizing models +The algorithms serialization API is not properly unified yet; however, there is a simple method to save / restore trained models. +`--model-path` command-line option loads the tensorflow state from a given path before training, and saves it after the training. +Let's imagine you'd like to train a2c on MuJoCo humanoid, save the model and then later visualize what has it learnt. +```bash + python -m baselines.common.run --alg=a2c --env=Humanoid-v2 --num-timesteps=2e7 --model-path=~/models/humanoid_20M_a2c +``` +To load and visualize the model, we'll do the following - load the model, train it for trivial number of steps (say, 10), and then visualize: +```bash + python -m baselines.common.run --alg=a2c --env=Humanoid-v2 --num-timesteps=10 --model-path=~/models/humanoid_20M_a2c --play +``` + + + + + + + ## Subpackages - [A2C](baselines/a2c) diff --git a/baselines/a2c/a2c.py b/baselines/a2c/a2c.py index f1de88a..ab1082c 100644 --- a/baselines/a2c/a2c.py +++ b/baselines/a2c/a2c.py @@ -1,42 +1,48 @@ import os.path as osp import time import joblib -import numpy as np import tensorflow as tf from baselines import logger from baselines.common import set_global_seeds, explained_variance -from baselines.common.runners import AbstractEnvRunner from baselines.common import tf_util +from baselines.common.policies import build_policy + -from baselines.a2c.utils import discount_with_dones from baselines.a2c.utils import Scheduler, make_path, find_trainable_variables -from baselines.a2c.utils import cat_entropy, mse +from baselines.a2c.runner import Runner + +from tensorflow import losses class Model(object): - def __init__(self, policy, ob_space, ac_space, nenvs, nsteps, + def __init__(self, policy, env, nsteps, ent_coef=0.01, vf_coef=0.5, max_grad_norm=0.5, lr=7e-4, alpha=0.99, epsilon=1e-5, total_timesteps=int(80e6), lrschedule='linear'): - sess = tf_util.make_session() + sess = tf_util.get_session() + nenvs = env.num_envs nbatch = nenvs*nsteps - A = tf.placeholder(tf.int32, [nbatch]) + + with tf.variable_scope('a2c_model', reuse=tf.AUTO_REUSE): + step_model = policy(nenvs, 1, sess) + train_model = policy(nbatch, nsteps, sess) + + A = tf.placeholder(train_model.action.dtype, train_model.action.shape) ADV = tf.placeholder(tf.float32, [nbatch]) R = tf.placeholder(tf.float32, [nbatch]) LR = tf.placeholder(tf.float32, []) - step_model = policy(sess, ob_space, ac_space, nenvs, 1, reuse=False) - train_model = policy(sess, ob_space, ac_space, nenvs*nsteps, nsteps, reuse=True) + neglogpac = train_model.pd.neglogp(A) + entropy = tf.reduce_mean(train_model.pd.entropy()) - neglogpac = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=train_model.pi, labels=A) pg_loss = tf.reduce_mean(ADV * neglogpac) - vf_loss = tf.reduce_mean(mse(tf.squeeze(train_model.vf), R)) - entropy = tf.reduce_mean(cat_entropy(train_model.pi)) + vf_loss = losses.mean_squared_error(tf.squeeze(train_model.vf), R) + loss = pg_loss - entropy*ent_coef + vf_loss * vf_coef - params = find_trainable_variables("model") + params = find_trainable_variables("a2c_model") grads = tf.gradients(loss, params) if max_grad_norm is not None: grads, grad_norm = tf.clip_by_global_norm(grads, max_grad_norm) @@ -50,6 +56,7 @@ class Model(object): advs = rewards - values for step in range(len(obs)): cur_lr = lr.value() + td_map = {train_model.X:obs, A:actions, ADV:advs, R:rewards, LR:cur_lr} if states is not None: td_map[train_model.S] = states @@ -82,61 +89,79 @@ class Model(object): self.load = load tf.global_variables_initializer().run(session=sess) -class Runner(AbstractEnvRunner): - def __init__(self, env, model, nsteps=5, gamma=0.99): - super().__init__(env=env, model=model, nsteps=nsteps) - self.gamma = gamma +def learn( + network, + env, + seed=None, + nsteps=5, + total_timesteps=int(80e6), + vf_coef=0.5, + ent_coef=0.01, + max_grad_norm=0.5, + lr=7e-4, + lrschedule='linear', + epsilon=1e-5, + alpha=0.99, + gamma=0.99, + log_interval=100, + **network_kwargs): + + ''' + Main entrypoint for A2C algorithm. Train a policy with given network architecture on a given environment using a2c algorithm. + + Parameters: + ----------- + + network: policy network architecture. Either string (mlp, lstm, lnlstm, cnn_lstm, cnn, cnn_small, conv_only - see baselines.common/models.py for full list) + specifying the standard network architecture, or a function that takes tensorflow tensor as input and returns + tuple (output_tensor, extra_feed) where output tensor is the last network layer output, extra_feed is None for feed-forward + neural nets, and extra_feed is a dictionary describing how to feed state into the network for recurrent neural nets. + See baselines.common/policies.py/lstm for more details on using recurrent nets in policies + + + env: RL environment. Should implement interface similar to VecEnv (baselines.common/vec_env) or be wrapped with DummyVecEnv (baselines.common/vec_env/dummy_vec_env.py) + + + seed: seed to make random number sequence in the alorightm reproducible. By default is None which means seed from system noise generator (not reproducible) + + nsteps: int, number of steps of the vectorized environment per update (i.e. batch size is nsteps * nenv where + nenv is number of environment copies simulated in parallel) + + total_timesteps: int, total number of timesteps to train on (default: 80M) + + vf_coef: float, coefficient in front of value function loss in the total loss function (default: 0.5) + + ent_coef: float, coeffictiant in front of the policy entropy in the total loss function (default: 0.01) + + max_gradient_norm: float, gradient is clipped to have global L2 norm no more than this value (default: 0.5) + + lr: float, learning rate for RMSProp (current implementation has RMSProp hardcoded in) (default: 7e-4) + + lrschedule: schedule of learning rate. Can be 'linear', 'constant', or a function [0..1] -> [0..1] that takes fraction of the training progress as input and + returns fraction of the learning rate (specified as lr) as output + + epsilon: float, RMSProp epsilon (stabilizes square root computation in denominator of RMSProp update) (default: 1e-5) + + alpha: float, RMSProp decay parameter (default: 0.99) + + gamma: float, reward discounting parameter (default: 0.99) + + log_interval: int, specifies how frequently the logs are printed out (default: 100) + + **network_kwargs: keyword arguments to the policy / network builder. See baselines.common/policies.py/build_policy and arguments to a particular type of network + For instance, 'mlp' network architecture has arguments num_hidden and num_layers. + + ''' + - def run(self): - mb_obs, mb_rewards, mb_actions, mb_values, mb_dones = [],[],[],[],[] - mb_states = self.states - for n in range(self.nsteps): - actions, values, states, _ = self.model.step(self.obs, self.states, self.dones) - mb_obs.append(np.copy(self.obs)) - mb_actions.append(actions) - mb_values.append(values) - mb_dones.append(self.dones) - obs, rewards, dones, _ = self.env.step(actions) - self.states = states - self.dones = dones - for n, done in enumerate(dones): - if done: - self.obs[n] = self.obs[n]*0 - self.obs = obs - mb_rewards.append(rewards) - mb_dones.append(self.dones) - #batch of steps to batch of rollouts - mb_obs = np.asarray(mb_obs, dtype=np.uint8).swapaxes(1, 0).reshape(self.batch_ob_shape) - mb_rewards = np.asarray(mb_rewards, dtype=np.float32).swapaxes(1, 0) - mb_actions = np.asarray(mb_actions, dtype=np.int32).swapaxes(1, 0) - mb_values = np.asarray(mb_values, dtype=np.float32).swapaxes(1, 0) - mb_dones = np.asarray(mb_dones, dtype=np.bool).swapaxes(1, 0) - mb_masks = mb_dones[:, :-1] - mb_dones = mb_dones[:, 1:] - last_values = self.model.value(self.obs, self.states, self.dones).tolist() - #discount/bootstrap off value fn - for n, (rewards, dones, value) in enumerate(zip(mb_rewards, mb_dones, last_values)): - rewards = rewards.tolist() - dones = dones.tolist() - if dones[-1] == 0: - rewards = discount_with_dones(rewards+[value], dones+[0], self.gamma)[:-1] - else: - rewards = discount_with_dones(rewards, dones, self.gamma) - mb_rewards[n] = rewards - mb_rewards = mb_rewards.flatten() - mb_actions = mb_actions.flatten() - mb_values = mb_values.flatten() - mb_masks = mb_masks.flatten() - return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values -def learn(policy, env, seed, nsteps=5, total_timesteps=int(80e6), vf_coef=0.5, ent_coef=0.01, max_grad_norm=0.5, lr=7e-4, lrschedule='linear', epsilon=1e-5, alpha=0.99, gamma=0.99, log_interval=100): set_global_seeds(seed) nenvs = env.num_envs - ob_space = env.observation_space - ac_space = env.action_space - model = Model(policy=policy, ob_space=ob_space, ac_space=ac_space, nenvs=nenvs, nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, + policy = build_policy(env, network, **network_kwargs) + + model = Model(policy=policy, env=env, nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, max_grad_norm=max_grad_norm, lr=lr, alpha=alpha, epsilon=epsilon, total_timesteps=total_timesteps, lrschedule=lrschedule) runner = Runner(env, model, nsteps=nsteps, gamma=gamma) @@ -158,3 +183,4 @@ def learn(policy, env, seed, nsteps=5, total_timesteps=int(80e6), vf_coef=0.5, e logger.dump_tabular() env.close() return model + diff --git a/baselines/a2c/policies.py b/baselines/a2c/policies.py deleted file mode 100644 index 6fbbb14..0000000 --- a/baselines/a2c/policies.py +++ /dev/null @@ -1,146 +0,0 @@ -import numpy as np -import tensorflow as tf -from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch, lstm, lnlstm -from baselines.common.distributions import make_pdtype -from baselines.common.input import observation_input - -def nature_cnn(unscaled_images, **conv_kwargs): - """ - CNN from Nature paper. - """ - scaled_images = tf.cast(unscaled_images, tf.float32) / 255. - activ = tf.nn.relu - h = activ(conv(scaled_images, 'c1', nf=32, rf=8, stride=4, init_scale=np.sqrt(2), - **conv_kwargs)) - h2 = activ(conv(h, 'c2', nf=64, rf=4, stride=2, init_scale=np.sqrt(2), **conv_kwargs)) - h3 = activ(conv(h2, 'c3', nf=64, rf=3, stride=1, init_scale=np.sqrt(2), **conv_kwargs)) - h3 = conv_to_fc(h3) - return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) - -class LnLstmPolicy(object): - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, nlstm=256, reuse=False): - nenv = nbatch // nsteps - X, processed_x = observation_input(ob_space, nbatch) - M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) - S = tf.placeholder(tf.float32, [nenv, nlstm*2]) #states - self.pdtype = make_pdtype(ac_space) - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(processed_x) - xs = batch_to_seq(h, nenv, nsteps) - ms = batch_to_seq(M, nenv, nsteps) - h5, snew = lnlstm(xs, ms, S, 'lstm1', nh=nlstm) - h5 = seq_to_batch(h5) - vf = fc(h5, 'v', 1) - self.pd, self.pi = self.pdtype.pdfromlatent(h5) - - v0 = vf[:, 0] - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = np.zeros((nenv, nlstm*2), dtype=np.float32) - - def step(ob, state, mask): - return sess.run([a0, v0, snew, neglogp0], {X:ob, S:state, M:mask}) - - def value(ob, state, mask): - return sess.run(v0, {X:ob, S:state, M:mask}) - - self.X = X - self.M = M - self.S = S - self.vf = vf - self.step = step - self.value = value - -class LstmPolicy(object): - - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, nlstm=256, reuse=False): - nenv = nbatch // nsteps - self.pdtype = make_pdtype(ac_space) - X, processed_x = observation_input(ob_space, nbatch) - - M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) - S = tf.placeholder(tf.float32, [nenv, nlstm*2]) #states - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(X) - xs = batch_to_seq(h, nenv, nsteps) - ms = batch_to_seq(M, nenv, nsteps) - h5, snew = lstm(xs, ms, S, 'lstm1', nh=nlstm) - h5 = seq_to_batch(h5) - vf = fc(h5, 'v', 1) - self.pd, self.pi = self.pdtype.pdfromlatent(h5) - - v0 = vf[:, 0] - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = np.zeros((nenv, nlstm*2), dtype=np.float32) - - def step(ob, state, mask): - return sess.run([a0, v0, snew, neglogp0], {X:ob, S:state, M:mask}) - - def value(ob, state, mask): - return sess.run(v0, {X:ob, S:state, M:mask}) - - self.X = X - self.M = M - self.S = S - self.vf = vf - self.step = step - self.value = value - -class CnnPolicy(object): - - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, reuse=False, **conv_kwargs): #pylint: disable=W0613 - self.pdtype = make_pdtype(ac_space) - X, processed_x = observation_input(ob_space, nbatch) - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(processed_x, **conv_kwargs) - vf = fc(h, 'v', 1)[:,0] - self.pd, self.pi = self.pdtype.pdfromlatent(h, init_scale=0.01) - - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = None - - def step(ob, *_args, **_kwargs): - a, v, neglogp = sess.run([a0, vf, neglogp0], {X:ob}) - return a, v, self.initial_state, neglogp - - def value(ob, *_args, **_kwargs): - return sess.run(vf, {X:ob}) - - self.X = X - self.vf = vf - self.step = step - self.value = value - -class MlpPolicy(object): - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, reuse=False): #pylint: disable=W0613 - self.pdtype = make_pdtype(ac_space) - with tf.variable_scope("model", reuse=reuse): - X, processed_x = observation_input(ob_space, nbatch) - activ = tf.tanh - processed_x = tf.layers.flatten(processed_x) - pi_h1 = activ(fc(processed_x, 'pi_fc1', nh=64, init_scale=np.sqrt(2))) - pi_h2 = activ(fc(pi_h1, 'pi_fc2', nh=64, init_scale=np.sqrt(2))) - vf_h1 = activ(fc(processed_x, 'vf_fc1', nh=64, init_scale=np.sqrt(2))) - vf_h2 = activ(fc(vf_h1, 'vf_fc2', nh=64, init_scale=np.sqrt(2))) - vf = fc(vf_h2, 'vf', 1)[:,0] - - self.pd, self.pi = self.pdtype.pdfromlatent(pi_h2, init_scale=0.01) - - - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = None - - def step(ob, *_args, **_kwargs): - a, v, neglogp = sess.run([a0, vf, neglogp0], {X:ob}) - return a, v, self.initial_state, neglogp - - def value(ob, *_args, **_kwargs): - return sess.run(vf, {X:ob}) - - self.X = X - self.vf = vf - self.step = step - self.value = value diff --git a/baselines/a2c/run_atari.py b/baselines/a2c/run_atari.py deleted file mode 100644 index b09d9bb..0000000 --- a/baselines/a2c/run_atari.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 - -from baselines import logger -from baselines.common.cmd_util import make_atari_env, atari_arg_parser -from baselines.common.vec_env.vec_frame_stack import VecFrameStack -from baselines.a2c.a2c import learn -from baselines.ppo2.policies import CnnPolicy, LstmPolicy, LnLstmPolicy - -def train(env_id, num_timesteps, seed, policy, lrschedule, num_env): - if policy == 'cnn': - policy_fn = CnnPolicy - elif policy == 'lstm': - policy_fn = LstmPolicy - elif policy == 'lnlstm': - policy_fn = LnLstmPolicy - env = VecFrameStack(make_atari_env(env_id, num_env, seed), 4) - learn(policy_fn, env, seed, total_timesteps=int(num_timesteps * 1.1), lrschedule=lrschedule) - env.close() - -def main(): - parser = atari_arg_parser() - parser.add_argument('--policy', help='Policy architecture', choices=['cnn', 'lstm', 'lnlstm'], default='cnn') - parser.add_argument('--lrschedule', help='Learning rate schedule', choices=['constant', 'linear'], default='constant') - args = parser.parse_args() - logger.configure() - train(args.env, num_timesteps=args.num_timesteps, seed=args.seed, - policy=args.policy, lrschedule=args.lrschedule, num_env=16) - -if __name__ == '__main__': - main() diff --git a/baselines/a2c/utils.py b/baselines/a2c/utils.py index a7610eb..f38085b 100644 --- a/baselines/a2c/utils.py +++ b/baselines/a2c/utils.py @@ -1,8 +1,6 @@ import os -import gym import numpy as np import tensorflow as tf -from gym import spaces from collections import deque def sample(logits): @@ -10,18 +8,15 @@ def sample(logits): return tf.argmax(logits - tf.log(-tf.log(noise)), 1) def cat_entropy(logits): - a0 = logits - tf.reduce_max(logits, 1, keep_dims=True) + a0 = logits - tf.reduce_max(logits, 1, keepdims=True) ea0 = tf.exp(a0) - z0 = tf.reduce_sum(ea0, 1, keep_dims=True) + z0 = tf.reduce_sum(ea0, 1, keepdims=True) p0 = ea0 / z0 return tf.reduce_sum(p0 * (tf.log(z0) - a0), 1) def cat_entropy_softmax(p0): return - tf.reduce_sum(p0 * tf.log(p0 + 1e-6), axis = 1) -def mse(pred, target): - return tf.square(pred-target)/2. - def ortho_init(scale=1.0): def _ortho_init(shape, dtype, partition_info=None): #lasagne ortho init for tf @@ -58,7 +53,7 @@ def conv(x, scope, *, nf, rf, stride, pad='VALID', init_scale=1.0, data_format=' b = tf.get_variable("b", bias_var_shape, initializer=tf.constant_initializer(0.0)) if not one_dim_bias and data_format == 'NHWC': b = tf.reshape(b, bshape) - return b + tf.nn.conv2d(x, w, strides=strides, padding=pad, data_format=data_format) + return tf.nn.conv2d(x, w, strides=strides, padding=pad, data_format=data_format) + b def fc(x, scope, nh, *, init_scale=1.0, init_bias=0.0): with tf.variable_scope(scope): @@ -85,7 +80,6 @@ def seq_to_batch(h, flat = False): def lstm(xs, ms, s, scope, nh, init_scale=1.0): nbatch, nin = [v.value for v in xs[0].get_shape()] - nsteps = len(xs) with tf.variable_scope(scope): wx = tf.get_variable("wx", [nin, nh*4], initializer=ortho_init(init_scale)) wh = tf.get_variable("wh", [nh, nh*4], initializer=ortho_init(init_scale)) @@ -115,7 +109,6 @@ def _ln(x, g, b, e=1e-5, axes=[1]): def lnlstm(xs, ms, s, scope, nh, init_scale=1.0): nbatch, nin = [v.value for v in xs[0].get_shape()] - nsteps = len(xs) with tf.variable_scope(scope): wx = tf.get_variable("wx", [nin, nh*4], initializer=ortho_init(init_scale)) gx = tf.get_variable("gx", [nh*4], initializer=tf.constant_initializer(1.0)) @@ -160,8 +153,7 @@ def discount_with_dones(rewards, dones, gamma): return discounted[::-1] def find_trainable_variables(key): - with tf.variable_scope(key): - return tf.trainable_variables() + return tf.trainable_variables(key) def make_path(f): return os.makedirs(f, exist_ok=True) diff --git a/baselines/acer/acer_simple.py b/baselines/acer/acer_simple.py deleted file mode 100644 index bed486a..0000000 --- a/baselines/acer/acer_simple.py +++ /dev/null @@ -1,348 +0,0 @@ -import time -import joblib -import numpy as np -import tensorflow as tf -from baselines import logger - -from baselines.common import set_global_seeds -from baselines.common.runners import AbstractEnvRunner - -from baselines.a2c.utils import batch_to_seq, seq_to_batch -from baselines.a2c.utils import Scheduler, make_path, find_trainable_variables -from baselines.a2c.utils import cat_entropy_softmax -from baselines.a2c.utils import EpisodeStats -from baselines.a2c.utils import get_by_index, check_shape, avg_norm, gradient_add, q_explained_variance -from baselines.acer.buffer import Buffer - -import os.path as osp - -# remove last step -def strip(var, nenvs, nsteps, flat = False): - vars = batch_to_seq(var, nenvs, nsteps + 1, flat) - return seq_to_batch(vars[:-1], flat) - -def q_retrace(R, D, q_i, v, rho_i, nenvs, nsteps, gamma): - """ - Calculates q_retrace targets - - :param R: Rewards - :param D: Dones - :param q_i: Q values for actions taken - :param v: V values - :param rho_i: Importance weight for each action - :return: Q_retrace values - """ - rho_bar = batch_to_seq(tf.minimum(1.0, rho_i), nenvs, nsteps, True) # list of len steps, shape [nenvs] - rs = batch_to_seq(R, nenvs, nsteps, True) # list of len steps, shape [nenvs] - ds = batch_to_seq(D, nenvs, nsteps, True) # list of len steps, shape [nenvs] - q_is = batch_to_seq(q_i, nenvs, nsteps, True) - vs = batch_to_seq(v, nenvs, nsteps + 1, True) - v_final = vs[-1] - qret = v_final - qrets = [] - for i in range(nsteps - 1, -1, -1): - check_shape([qret, ds[i], rs[i], rho_bar[i], q_is[i], vs[i]], [[nenvs]] * 6) - qret = rs[i] + gamma * qret * (1.0 - ds[i]) - qrets.append(qret) - qret = (rho_bar[i] * (qret - q_is[i])) + vs[i] - qrets = qrets[::-1] - qret = seq_to_batch(qrets, flat=True) - return qret - -# For ACER with PPO clipping instead of trust region -# def clip(ratio, eps_clip): -# # assume 0 <= eps_clip <= 1 -# return tf.minimum(1 + eps_clip, tf.maximum(1 - eps_clip, ratio)) - -class Model(object): - def __init__(self, policy, ob_space, ac_space, nenvs, nsteps, nstack, num_procs, - ent_coef, q_coef, gamma, max_grad_norm, lr, - rprop_alpha, rprop_epsilon, total_timesteps, lrschedule, - c, trust_region, alpha, delta): - config = tf.ConfigProto(allow_soft_placement=True, - intra_op_parallelism_threads=num_procs, - inter_op_parallelism_threads=num_procs) - sess = tf.Session(config=config) - nact = ac_space.n - nbatch = nenvs * nsteps - - A = tf.placeholder(tf.int32, [nbatch]) # actions - D = tf.placeholder(tf.float32, [nbatch]) # dones - R = tf.placeholder(tf.float32, [nbatch]) # rewards, not returns - MU = tf.placeholder(tf.float32, [nbatch, nact]) # mu's - LR = tf.placeholder(tf.float32, []) - eps = 1e-6 - - step_model = policy(sess, ob_space, ac_space, nenvs, 1, nstack, reuse=False) - train_model = policy(sess, ob_space, ac_space, nenvs, nsteps + 1, nstack, reuse=True) - - params = find_trainable_variables("model") - print("Params {}".format(len(params))) - for var in params: - print(var) - - # create polyak averaged model - ema = tf.train.ExponentialMovingAverage(alpha) - ema_apply_op = ema.apply(params) - - def custom_getter(getter, *args, **kwargs): - v = ema.average(getter(*args, **kwargs)) - print(v.name) - return v - - with tf.variable_scope("", custom_getter=custom_getter, reuse=True): - polyak_model = policy(sess, ob_space, ac_space, nenvs, nsteps + 1, nstack, reuse=True) - - # Notation: (var) = batch variable, (var)s = seqeuence variable, (var)_i = variable index by action at step i - v = tf.reduce_sum(train_model.pi * train_model.q, axis = -1) # shape is [nenvs * (nsteps + 1)] - - # strip off last step - f, f_pol, q = map(lambda var: strip(var, nenvs, nsteps), [train_model.pi, polyak_model.pi, train_model.q]) - # Get pi and q values for actions taken - f_i = get_by_index(f, A) - q_i = get_by_index(q, A) - - # Compute ratios for importance truncation - rho = f / (MU + eps) - rho_i = get_by_index(rho, A) - - # Calculate Q_retrace targets - qret = q_retrace(R, D, q_i, v, rho_i, nenvs, nsteps, gamma) - - # Calculate losses - # Entropy - entropy = tf.reduce_mean(cat_entropy_softmax(f)) - - # Policy Graident loss, with truncated importance sampling & bias correction - v = strip(v, nenvs, nsteps, True) - check_shape([qret, v, rho_i, f_i], [[nenvs * nsteps]] * 4) - check_shape([rho, f, q], [[nenvs * nsteps, nact]] * 2) - - # Truncated importance sampling - adv = qret - v - logf = tf.log(f_i + eps) - gain_f = logf * tf.stop_gradient(adv * tf.minimum(c, rho_i)) # [nenvs * nsteps] - loss_f = -tf.reduce_mean(gain_f) - - # Bias correction for the truncation - adv_bc = (q - tf.reshape(v, [nenvs * nsteps, 1])) # [nenvs * nsteps, nact] - logf_bc = tf.log(f + eps) # / (f_old + eps) - check_shape([adv_bc, logf_bc], [[nenvs * nsteps, nact]]*2) - gain_bc = tf.reduce_sum(logf_bc * tf.stop_gradient(adv_bc * tf.nn.relu(1.0 - (c / (rho + eps))) * f), axis = 1) #IMP: This is sum, as expectation wrt f - loss_bc= -tf.reduce_mean(gain_bc) - - loss_policy = loss_f + loss_bc - - # Value/Q function loss, and explained variance - check_shape([qret, q_i], [[nenvs * nsteps]]*2) - ev = q_explained_variance(tf.reshape(q_i, [nenvs, nsteps]), tf.reshape(qret, [nenvs, nsteps])) - loss_q = tf.reduce_mean(tf.square(tf.stop_gradient(qret) - q_i)*0.5) - - # Net loss - check_shape([loss_policy, loss_q, entropy], [[]] * 3) - loss = loss_policy + q_coef * loss_q - ent_coef * entropy - - if trust_region: - g = tf.gradients(- (loss_policy - ent_coef * entropy) * nsteps * nenvs, f) #[nenvs * nsteps, nact] - # k = tf.gradients(KL(f_pol || f), f) - k = - f_pol / (f + eps) #[nenvs * nsteps, nact] # Directly computed gradient of KL divergence wrt f - k_dot_g = tf.reduce_sum(k * g, axis=-1) - adj = tf.maximum(0.0, (tf.reduce_sum(k * g, axis=-1) - delta) / (tf.reduce_sum(tf.square(k), axis=-1) + eps)) #[nenvs * nsteps] - - # Calculate stats (before doing adjustment) for logging. - avg_norm_k = avg_norm(k) - avg_norm_g = avg_norm(g) - avg_norm_k_dot_g = tf.reduce_mean(tf.abs(k_dot_g)) - avg_norm_adj = tf.reduce_mean(tf.abs(adj)) - - g = g - tf.reshape(adj, [nenvs * nsteps, 1]) * k - grads_f = -g/(nenvs*nsteps) # These are turst region adjusted gradients wrt f ie statistics of policy pi - grads_policy = tf.gradients(f, params, grads_f) - grads_q = tf.gradients(loss_q * q_coef, params) - grads = [gradient_add(g1, g2, param) for (g1, g2, param) in zip(grads_policy, grads_q, params)] - - avg_norm_grads_f = avg_norm(grads_f) * (nsteps * nenvs) - norm_grads_q = tf.global_norm(grads_q) - norm_grads_policy = tf.global_norm(grads_policy) - else: - grads = tf.gradients(loss, params) - - if max_grad_norm is not None: - grads, norm_grads = tf.clip_by_global_norm(grads, max_grad_norm) - grads = list(zip(grads, params)) - trainer = tf.train.RMSPropOptimizer(learning_rate=LR, decay=rprop_alpha, epsilon=rprop_epsilon) - _opt_op = trainer.apply_gradients(grads) - - # so when you call _train, you first do the gradient step, then you apply ema - with tf.control_dependencies([_opt_op]): - _train = tf.group(ema_apply_op) - - lr = Scheduler(v=lr, nvalues=total_timesteps, schedule=lrschedule) - - # Ops/Summaries to run, and their names for logging - run_ops = [_train, loss, loss_q, entropy, loss_policy, loss_f, loss_bc, ev, norm_grads] - names_ops = ['loss', 'loss_q', 'entropy', 'loss_policy', 'loss_f', 'loss_bc', 'explained_variance', - 'norm_grads'] - if trust_region: - run_ops = run_ops + [norm_grads_q, norm_grads_policy, avg_norm_grads_f, avg_norm_k, avg_norm_g, avg_norm_k_dot_g, - avg_norm_adj] - names_ops = names_ops + ['norm_grads_q', 'norm_grads_policy', 'avg_norm_grads_f', 'avg_norm_k', 'avg_norm_g', - 'avg_norm_k_dot_g', 'avg_norm_adj'] - - def train(obs, actions, rewards, dones, mus, states, masks, steps): - cur_lr = lr.value_steps(steps) - td_map = {train_model.X: obs, polyak_model.X: obs, A: actions, R: rewards, D: dones, MU: mus, LR: cur_lr} - if states != []: - td_map[train_model.S] = states - td_map[train_model.M] = masks - td_map[polyak_model.S] = states - td_map[polyak_model.M] = masks - return names_ops, sess.run(run_ops, td_map)[1:] # strip off _train - - def save(save_path): - ps = sess.run(params) - make_path(osp.dirname(save_path)) - joblib.dump(ps, save_path) - - self.train = train - self.save = save - self.train_model = train_model - self.step_model = step_model - self.step = step_model.step - self.initial_state = step_model.initial_state - tf.global_variables_initializer().run(session=sess) - -class Runner(AbstractEnvRunner): - def __init__(self, env, model, nsteps, nstack): - super().__init__(env=env, model=model, nsteps=nsteps) - self.nstack = nstack - nh, nw, nc = env.observation_space.shape - self.nc = nc # nc = 1 for atari, but just in case - self.nenv = nenv = env.num_envs - self.nact = env.action_space.n - self.nbatch = nenv * nsteps - self.batch_ob_shape = (nenv*(nsteps+1), nh, nw, nc*nstack) - self.obs = np.zeros((nenv, nh, nw, nc * nstack), dtype=np.uint8) - obs = env.reset() - self.update_obs(obs) - - def update_obs(self, obs, dones=None): - if dones is not None: - self.obs *= (1 - dones.astype(np.uint8))[:, None, None, None] - self.obs = np.roll(self.obs, shift=-self.nc, axis=3) - self.obs[:, :, :, -self.nc:] = obs[:, :, :, :] - - def run(self): - enc_obs = np.split(self.obs, self.nstack, axis=3) # so now list of obs steps - mb_obs, mb_actions, mb_mus, mb_dones, mb_rewards = [], [], [], [], [] - for _ in range(self.nsteps): - actions, mus, states = self.model.step(self.obs, state=self.states, mask=self.dones) - mb_obs.append(np.copy(self.obs)) - mb_actions.append(actions) - mb_mus.append(mus) - mb_dones.append(self.dones) - obs, rewards, dones, _ = self.env.step(actions) - # states information for statefull models like LSTM - self.states = states - self.dones = dones - self.update_obs(obs, dones) - mb_rewards.append(rewards) - enc_obs.append(obs) - mb_obs.append(np.copy(self.obs)) - mb_dones.append(self.dones) - - enc_obs = np.asarray(enc_obs, dtype=np.uint8).swapaxes(1, 0) - mb_obs = np.asarray(mb_obs, dtype=np.uint8).swapaxes(1, 0) - mb_actions = np.asarray(mb_actions, dtype=np.int32).swapaxes(1, 0) - mb_rewards = np.asarray(mb_rewards, dtype=np.float32).swapaxes(1, 0) - mb_mus = np.asarray(mb_mus, dtype=np.float32).swapaxes(1, 0) - - mb_dones = np.asarray(mb_dones, dtype=np.bool).swapaxes(1, 0) - - mb_masks = mb_dones # Used for statefull models like LSTM's to mask state when done - mb_dones = mb_dones[:, 1:] # Used for calculating returns. The dones array is now aligned with rewards - - # shapes are now [nenv, nsteps, []] - # When pulling from buffer, arrays will now be reshaped in place, preventing a deep copy. - - return enc_obs, mb_obs, mb_actions, mb_rewards, mb_mus, mb_dones, mb_masks - -class Acer(): - def __init__(self, runner, model, buffer, log_interval): - self.runner = runner - self.model = model - self.buffer = buffer - self.log_interval = log_interval - self.tstart = None - self.episode_stats = EpisodeStats(runner.nsteps, runner.nenv) - self.steps = None - - def call(self, on_policy): - runner, model, buffer, steps = self.runner, self.model, self.buffer, self.steps - if on_policy: - enc_obs, obs, actions, rewards, mus, dones, masks = runner.run() - self.episode_stats.feed(rewards, dones) - if buffer is not None: - buffer.put(enc_obs, actions, rewards, mus, dones, masks) - else: - # get obs, actions, rewards, mus, dones from buffer. - obs, actions, rewards, mus, dones, masks = buffer.get() - - # reshape stuff correctly - obs = obs.reshape(runner.batch_ob_shape) - actions = actions.reshape([runner.nbatch]) - rewards = rewards.reshape([runner.nbatch]) - mus = mus.reshape([runner.nbatch, runner.nact]) - dones = dones.reshape([runner.nbatch]) - masks = masks.reshape([runner.batch_ob_shape[0]]) - - names_ops, values_ops = model.train(obs, actions, rewards, dones, mus, model.initial_state, masks, steps) - - if on_policy and (int(steps/runner.nbatch) % self.log_interval == 0): - logger.record_tabular("total_timesteps", steps) - logger.record_tabular("fps", int(steps/(time.time() - self.tstart))) - # IMP: In EpisodicLife env, during training, we get done=True at each loss of life, not just at the terminal state. - # Thus, this is mean until end of life, not end of episode. - # For true episode rewards, see the monitor files in the log folder. - logger.record_tabular("mean_episode_length", self.episode_stats.mean_length()) - logger.record_tabular("mean_episode_reward", self.episode_stats.mean_reward()) - for name, val in zip(names_ops, values_ops): - logger.record_tabular(name, float(val)) - logger.dump_tabular() - - -def learn(policy, env, seed, nsteps=20, nstack=4, total_timesteps=int(80e6), q_coef=0.5, ent_coef=0.01, - max_grad_norm=10, lr=7e-4, lrschedule='linear', rprop_epsilon=1e-5, rprop_alpha=0.99, gamma=0.99, - log_interval=100, buffer_size=50000, replay_ratio=4, replay_start=10000, c=10.0, - trust_region=True, alpha=0.99, delta=1): - print("Running Acer Simple") - print(locals()) - tf.reset_default_graph() - set_global_seeds(seed) - - nenvs = env.num_envs - ob_space = env.observation_space - ac_space = env.action_space - num_procs = len(env.remotes) # HACK - model = Model(policy=policy, ob_space=ob_space, ac_space=ac_space, nenvs=nenvs, nsteps=nsteps, nstack=nstack, - num_procs=num_procs, ent_coef=ent_coef, q_coef=q_coef, gamma=gamma, - max_grad_norm=max_grad_norm, lr=lr, rprop_alpha=rprop_alpha, rprop_epsilon=rprop_epsilon, - total_timesteps=total_timesteps, lrschedule=lrschedule, c=c, - trust_region=trust_region, alpha=alpha, delta=delta) - - runner = Runner(env=env, model=model, nsteps=nsteps, nstack=nstack) - if replay_ratio > 0: - buffer = Buffer(env=env, nsteps=nsteps, nstack=nstack, size=buffer_size) - else: - buffer = None - nbatch = nenvs*nsteps - acer = Acer(runner, model, buffer, log_interval) - acer.tstart = time.time() - for acer.steps in range(0, total_timesteps, nbatch): #nbatch samples, 1 on_policy call and multiple off-policy calls - acer.call(on_policy=True) - if replay_ratio > 0 and buffer.has_atleast(replay_start): - n = np.random.poisson(replay_ratio) - for _ in range(n): - acer.call(on_policy=False) # no simulation steps in this - - env.close() diff --git a/baselines/acer/policies.py b/baselines/acer/policies.py index 627c400..6dad6f3 100644 --- a/baselines/acer/policies.py +++ b/baselines/acer/policies.py @@ -1,6 +1,6 @@ import numpy as np import tensorflow as tf -from baselines.ppo2.policies import nature_cnn +from baselines.common.policies import nature_cnn from baselines.a2c.utils import fc, batch_to_seq, seq_to_batch, lstm, sample @@ -18,11 +18,13 @@ class AcerCnnPolicy(object): pi = tf.nn.softmax(pi_logits) q = fc(h, 'q', nact) - a = sample(pi_logits) # could change this to use self.pi instead + a = sample(tf.nn.softmax(pi_logits)) # could change this to use self.pi instead self.initial_state = [] # not stateful self.X = X self.pi = pi # actual policy params now + self.pi_logits = pi_logits self.q = q + self.vf = q def step(ob, *args, **kwargs): # returns actions, mus, states diff --git a/baselines/acer/run_atari.py b/baselines/acer/run_atari.py deleted file mode 100644 index cce979e..0000000 --- a/baselines/acer/run_atari.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 -from baselines import logger -from baselines.acer.acer_simple import learn -from baselines.acer.policies import AcerCnnPolicy, AcerLstmPolicy -from baselines.common.cmd_util import make_atari_env, atari_arg_parser - -def train(env_id, num_timesteps, seed, policy, lrschedule, num_cpu): - env = make_atari_env(env_id, num_cpu, seed) - if policy == 'cnn': - policy_fn = AcerCnnPolicy - elif policy == 'lstm': - policy_fn = AcerLstmPolicy - else: - print("Policy {} not implemented".format(policy)) - return - learn(policy_fn, env, seed, total_timesteps=int(num_timesteps * 1.1), lrschedule=lrschedule) - env.close() - -def main(): - parser = atari_arg_parser() - parser.add_argument('--policy', help='Policy architecture', choices=['cnn', 'lstm', 'lnlstm'], default='cnn') - parser.add_argument('--lrschedule', help='Learning rate schedule', choices=['constant', 'linear'], default='constant') - parser.add_argument('--logdir', help ='Directory for logging') - args = parser.parse_args() - logger.configure(args.logdir) - train(args.env, num_timesteps=args.num_timesteps, seed=args.seed, - policy=args.policy, lrschedule=args.lrschedule, num_cpu=16) - -if __name__ == '__main__': - main() diff --git a/baselines/acktr/acktr_disc.py b/baselines/acktr/acktr_disc.py index a8b77b6..93eaccc 100644 --- a/baselines/acktr/acktr_disc.py +++ b/baselines/acktr/acktr_disc.py @@ -6,11 +6,12 @@ import tensorflow as tf from baselines import logger from baselines.common import set_global_seeds, explained_variance +from baselines.common.policies import build_policy +from baselines.common.tf_util import get_session -from baselines.a2c.a2c import Runner +from baselines.a2c.runner import Runner from baselines.a2c.utils import discount_with_dones from baselines.a2c.utils import Scheduler, find_trainable_variables -from baselines.a2c.utils import cat_entropy, mse from baselines.acktr import kfac @@ -19,11 +20,8 @@ class Model(object): def __init__(self, policy, ob_space, ac_space, nenvs,total_timesteps, nprocs=32, nsteps=20, ent_coef=0.01, vf_coef=0.5, vf_fisher_coef=1.0, lr=0.25, max_grad_norm=0.5, kfac_clip=0.001, lrschedule='linear'): - config = tf.ConfigProto(allow_soft_placement=True, - intra_op_parallelism_threads=nprocs, - inter_op_parallelism_threads=nprocs) - config.gpu_options.allow_growth = True - self.sess = sess = tf.Session(config=config) + + self.sess = sess = get_session() nact = ac_space.n nbatch = nenvs * nsteps A = tf.placeholder(tf.int32, [nbatch]) @@ -32,27 +30,28 @@ class Model(object): PG_LR = tf.placeholder(tf.float32, []) VF_LR = tf.placeholder(tf.float32, []) - self.model = step_model = policy(sess, ob_space, ac_space, nenvs, 1, reuse=False) - self.model2 = train_model = policy(sess, ob_space, ac_space, nenvs*nsteps, nsteps, reuse=True) + with tf.variable_scope('acktr_model', reuse=tf.AUTO_REUSE): + self.model = step_model = policy(nenvs, 1, sess=sess) + self.model2 = train_model = policy(nenvs*nsteps, nsteps, sess=sess) - logpac = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=train_model.pi, labels=A) + neglogpac = train_model.pd.neglogp(A) self.logits = logits = train_model.pi ##training loss - pg_loss = tf.reduce_mean(ADV*logpac) - entropy = tf.reduce_mean(cat_entropy(train_model.pi)) + pg_loss = tf.reduce_mean(ADV*neglogpac) + entropy = tf.reduce_mean(train_model.pd.entropy()) pg_loss = pg_loss - ent_coef * entropy - vf_loss = tf.reduce_mean(mse(tf.squeeze(train_model.vf), R)) + vf_loss = tf.losses.mean_squared_error(tf.squeeze(train_model.vf), R) train_loss = pg_loss + vf_coef * vf_loss ##Fisher loss construction - self.pg_fisher = pg_fisher_loss = -tf.reduce_mean(logpac) + self.pg_fisher = pg_fisher_loss = -tf.reduce_mean(neglogpac) sample_net = train_model.vf + tf.random_normal(tf.shape(train_model.vf)) self.vf_fisher = vf_fisher_loss = - vf_fisher_coef*tf.reduce_mean(tf.pow(train_model.vf - tf.stop_gradient(sample_net), 2)) self.joint_fisher = joint_fisher_loss = pg_fisher_loss + vf_fisher_loss - self.params=params = find_trainable_variables("model") + self.params=params = find_trainable_variables("acktr_model") self.grads_check = grads = tf.gradients(train_loss,params) @@ -105,12 +104,17 @@ class Model(object): self.initial_state = step_model.initial_state tf.global_variables_initializer().run(session=sess) -def learn(policy, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=1, nprocs=32, nsteps=20, +def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=1, nprocs=32, nsteps=20, ent_coef=0.01, vf_coef=0.5, vf_fisher_coef=1.0, lr=0.25, max_grad_norm=0.5, - kfac_clip=0.001, save_interval=None, lrschedule='linear'): - tf.reset_default_graph() + kfac_clip=0.001, save_interval=None, lrschedule='linear', **network_kwargs): set_global_seeds(seed) + + if network == 'cnn': + network_kwargs['one_dim_bias'] = True + + policy = build_policy(env, network, **network_kwargs) + nenvs = env.num_envs ob_space = env.observation_space ac_space = env.action_space @@ -153,3 +157,4 @@ def learn(policy, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval coord.request_stop() coord.join(enqueue_threads) env.close() + return model diff --git a/baselines/acktr/run_atari.py b/baselines/acktr/run_atari.py index 6e398ce..50e1580 100644 --- a/baselines/acktr/run_atari.py +++ b/baselines/acktr/run_atari.py @@ -6,11 +6,11 @@ from baselines import logger from baselines.acktr.acktr_disc import learn from baselines.common.cmd_util import make_atari_env, atari_arg_parser from baselines.common.vec_env.vec_frame_stack import VecFrameStack -from baselines.ppo2.policies import CnnPolicy +from baselines.common.policies import cnn def train(env_id, num_timesteps, seed, num_cpu): env = VecFrameStack(make_atari_env(env_id, num_cpu, seed), 4) - policy_fn = partial(CnnPolicy, one_dim_bias=True) + policy_fn = cnn(env=env, one_dim_bias=True) learn(policy_fn, env, seed, total_timesteps=int(num_timesteps * 1.1), nprocs=num_cpu) env.close() diff --git a/baselines/bench/benchmarks.py b/baselines/bench/benchmarks.py index a5a35f8..64a513c 100644 --- a/baselines/bench/benchmarks.py +++ b/baselines/bench/benchmarks.py @@ -59,7 +59,7 @@ register_benchmark({ register_benchmark({ 'name': 'Atari10M', 'description': '7 Atari games from Mnih et al. (2013), with pixel observations, 10M timesteps', - 'tasks': [{'desc': _game, 'env_id': _game + _ATARI_SUFFIX, 'trials': 2, 'num_timesteps': int(10e6)} for _game in _atari7] + 'tasks': [{'desc': _game, 'env_id': _game + _ATARI_SUFFIX, 'trials': 6, 'num_timesteps': int(10e6)} for _game in _atari7] }) register_benchmark({ @@ -84,7 +84,7 @@ _mujocosmall = [ register_benchmark({ 'name': 'Mujoco1M', 'description': 'Some small 2D MuJoCo tasks, run for 1M timesteps', - 'tasks': [{'env_id': _envid, 'trials': 3, 'num_timesteps': int(1e6)} for _envid in _mujocosmall] + 'tasks': [{'env_id': _envid, 'trials': 6, 'num_timesteps': int(1e6)} for _envid in _mujocosmall] }) register_benchmark({ 'name': 'MujocoWalkers', diff --git a/baselines/bench/monitor.py b/baselines/bench/monitor.py index 0da1b4f..bb0c282 100644 --- a/baselines/bench/monitor.py +++ b/baselines/bench/monitor.py @@ -112,6 +112,8 @@ def load_results(dir): with open(fname, 'rt') as fh: if fname.endswith('csv'): firstline = fh.readline() + if not firstline: + continue assert firstline[0] == '#' header = json.loads(firstline[1:]) df = pandas.read_csv(fh, index_col=None) @@ -158,4 +160,4 @@ def test_monitor(): last_logline = pandas.read_csv(f, index_col=None) assert set(last_logline.keys()) == {'l', 't', 'r'}, "Incorrect keys in monitor logline" f.close() - os.remove(mon_file) \ No newline at end of file + os.remove(mon_file) diff --git a/baselines/common/atari_wrappers.py b/baselines/common/atari_wrappers.py index 2aefad7..4598e23 100644 --- a/baselines/common/atari_wrappers.py +++ b/baselines/common/atari_wrappers.py @@ -1,4 +1,6 @@ import numpy as np +import os +os.environ.setdefault('PATH', '') from collections import deque import gym from gym import spaces diff --git a/baselines/common/cmd_util.py b/baselines/common/cmd_util.py index 5707695..8ce8bac 100644 --- a/baselines/common/cmd_util.py +++ b/baselines/common/cmd_util.py @@ -3,7 +3,11 @@ Helpers for scripts like run_atari.py. """ import os -from mpi4py import MPI +try: + from mpi4py import MPI +except ImportError: + MPI = None + import gym from gym.wrappers import FlattenDictWrapper from baselines import logger @@ -20,22 +24,28 @@ def make_atari_env(env_id, num_env, seed, wrapper_kwargs=None, start_index=0): def make_env(rank): # pylint: disable=C0111 def _thunk(): env = make_atari(env_id) - env.seed(seed + rank) + env.seed(seed + rank if seed is not None else None) env = Monitor(env, logger.get_dir() and os.path.join(logger.get_dir(), str(rank))) return wrap_deepmind(env, **wrapper_kwargs) return _thunk set_global_seeds(seed) return SubprocVecEnv([make_env(i + start_index) for i in range(num_env)]) -def make_mujoco_env(env_id, seed): +def make_mujoco_env(env_id, seed, reward_scale=1.0): """ Create a wrapped, monitored gym.Env for MuJoCo. """ rank = MPI.COMM_WORLD.Get_rank() - set_global_seeds(seed + 10000 * rank) + myseed = seed + 1000 * rank if seed is not None else None + set_global_seeds(myseed) env = gym.make(env_id) - env = Monitor(env, os.path.join(logger.get_dir(), str(rank))) + env = Monitor(env, os.path.join(logger.get_dir(), str(rank)), allow_early_resets=True) env.seed(seed) + + if reward_scale != 1.0: + from baselines.common.retro_wrappers import RewardScaler + env = RewardScaler(env, reward_scale) + return env def make_robotics_env(env_id, seed, rank=0): @@ -64,18 +74,28 @@ def atari_arg_parser(): """ parser = arg_parser() parser.add_argument('--env', help='environment ID', default='BreakoutNoFrameskip-v4') - parser.add_argument('--seed', help='RNG seed', type=int, default=0) + parser.add_argument('--seed', help='RNG seed', type=int, default=None) parser.add_argument('--num-timesteps', type=int, default=int(10e6)) return parser def mujoco_arg_parser(): + print('Obsolete - use common_arg_parser instead') + return common_arg_parser() + +def common_arg_parser(): """ Create an argparse.ArgumentParser for run_mujoco.py. """ parser = arg_parser() parser.add_argument('--env', help='environment ID', type=str, default='Reacher-v2') - parser.add_argument('--seed', help='RNG seed', type=int, default=0) - parser.add_argument('--num-timesteps', type=int, default=int(1e6)) + parser.add_argument('--seed', help='RNG seed', type=int, default=None) + parser.add_argument('--alg', help='Algorithm', type=str, default='ppo2') + parser.add_argument('--num-timesteps', type=float, default=1e6), + parser.add_argument('--network', help='network type (mlp, cnn, lstm, cnn_lstm, conv_only)', default=None) + parser.add_argument('--gamestate', help='game state to load (so far only used in retro games)', default=None) + parser.add_argument('--num-env', help='Number of environment copies being run in parallel. When not specified, set to number of cpus for Atari, and to 1 for Mujoco', default=None, type=int) + parser.add_argument('--reward-scale', help='Reward scale factor. Default: 1.0', default=1.0, type=float) + parser.add_argument('--model-path', help='Path to save and load trained models from', default=None, type=str) parser.add_argument('--play', default=False, action='store_true') return parser @@ -85,6 +105,24 @@ def robotics_arg_parser(): """ parser = arg_parser() parser.add_argument('--env', help='environment ID', type=str, default='FetchReach-v0') - parser.add_argument('--seed', help='RNG seed', type=int, default=0) + parser.add_argument('--seed', help='RNG seed', type=int, default=None) parser.add_argument('--num-timesteps', type=int, default=int(1e6)) return parser + + +def parse_unknown_args(args): + """ + Parse arguments not consumed by arg parser into a dicitonary + """ + retval = {} + for arg in args: + assert arg.startswith('--') + assert '=' in arg, 'cannot parse arg {}'.format(arg) + key = arg.split('=')[0][2:] + value = arg.split('=')[1] + retval[key] = value + + return retval + + + diff --git a/baselines/common/distributions.py b/baselines/common/distributions.py index 8a57c37..29f3632 100644 --- a/baselines/common/distributions.py +++ b/baselines/common/distributions.py @@ -85,7 +85,7 @@ class DiagGaussianPdType(PdType): def pdfromlatent(self, latent_vector, init_scale=1.0, init_bias=0.0): mean = fc(latent_vector, 'pi', self.size, init_scale=init_scale, init_bias=init_bias) - logstd = tf.get_variable(name='logstd', shape=[1, self.size], initializer=tf.zeros_initializer()) + logstd = tf.get_variable(name='pi/logstd', shape=[1, self.size], initializer=tf.zeros_initializer()) pdparam = tf.concat([mean, mean * 0.0 + logstd], axis=1) return self.pdfromflat(pdparam), mean @@ -143,26 +143,26 @@ class CategoricalPd(Pd): # Note: we can't use sparse_softmax_cross_entropy_with_logits because # the implementation does not allow second-order derivatives... one_hot_actions = tf.one_hot(x, self.logits.get_shape().as_list()[-1]) - return tf.nn.softmax_cross_entropy_with_logits( + return tf.nn.softmax_cross_entropy_with_logits_v2( logits=self.logits, labels=one_hot_actions) def kl(self, other): - a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keep_dims=True) - a1 = other.logits - tf.reduce_max(other.logits, axis=-1, keep_dims=True) + a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True) + a1 = other.logits - tf.reduce_max(other.logits, axis=-1, keepdims=True) ea0 = tf.exp(a0) ea1 = tf.exp(a1) - z0 = tf.reduce_sum(ea0, axis=-1, keep_dims=True) - z1 = tf.reduce_sum(ea1, axis=-1, keep_dims=True) + z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True) + z1 = tf.reduce_sum(ea1, axis=-1, keepdims=True) p0 = ea0 / z0 return tf.reduce_sum(p0 * (a0 - tf.log(z0) - a1 + tf.log(z1)), axis=-1) def entropy(self): - a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keep_dims=True) + a0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True) ea0 = tf.exp(a0) - z0 = tf.reduce_sum(ea0, axis=-1, keep_dims=True) + z0 = tf.reduce_sum(ea0, axis=-1, keepdims=True) p0 = ea0 / z0 return tf.reduce_sum(p0 * (tf.log(z0) - a0), axis=-1) def sample(self): - u = tf.random_uniform(tf.shape(self.logits)) + u = tf.random_uniform(tf.shape(self.logits), dtype=self.logits.dtype) return tf.argmax(self.logits - tf.log(-tf.log(u)), axis=-1) @classmethod def fromflat(cls, flat): diff --git a/baselines/common/input.py b/baselines/common/input.py index 7fbf9fc..dff9480 100644 --- a/baselines/common/input.py +++ b/baselines/common/input.py @@ -1,30 +1,56 @@ import tensorflow as tf from gym.spaces import Discrete, Box -def observation_input(ob_space, batch_size=None, name='Ob'): - ''' - Build observation input with encoding depending on the - observation space type - Params: +def observation_placeholder(ob_space, batch_size=None, name='Ob'): + ''' + Create placeholder to feed observations into of the size appropriate to the observation space - ob_space: observation space (should be one of gym.spaces) - batch_size: batch size for input (default is None, so that resulting input placeholder can take tensors with any batch size) - name: tensorflow variable name for input placeholder + Parameters: + ---------- - returns: tuple (input_placeholder, processed_input_tensor) + ob_space: gym.Space observation space + + batch_size: int size of the batch to be fed into input. Can be left None in most cases. + + name: str name of the placeholder + + Returns: + ------- + + tensorflow placeholder tensor + ''' + + assert isinstance(ob_space, Discrete) or isinstance(ob_space, Box), \ + 'Can only deal with Discrete and Box observation spaces for now' + + return tf.placeholder(shape=(batch_size,) + ob_space.shape, dtype=ob_space.dtype, name=name) + + +def observation_input(ob_space, batch_size=None, name='Ob'): + ''' + Create placeholder to feed observations into of the size appropriate to the observation space, and add input + encoder of the appropriate type. + ''' + + placeholder = observation_placeholder(ob_space, batch_size, name) + return placeholder, encode_observation(ob_space, placeholder) + +def encode_observation(ob_space, placeholder): + ''' + Encode input in the way that is appropriate to the observation space + + Parameters: + ---------- + + ob_space: gym.Space observation space + + placeholder: tf.placeholder observation input placeholder ''' if isinstance(ob_space, Discrete): - input_x = tf.placeholder(shape=(batch_size,), dtype=tf.int32, name=name) - processed_x = tf.to_float(tf.one_hot(input_x, ob_space.n)) - return input_x, processed_x + return tf.to_float(tf.one_hot(placeholder, ob_space.n)) elif isinstance(ob_space, Box): - input_shape = (batch_size,) + ob_space.shape - input_x = tf.placeholder(shape=input_shape, dtype=ob_space.dtype, name=name) - processed_x = tf.to_float(input_x) - return input_x, processed_x - + return tf.to_float(placeholder) else: raise NotImplementedError - diff --git a/baselines/common/misc_util.py b/baselines/common/misc_util.py index 9985dea..451de1c 100644 --- a/baselines/common/misc_util.py +++ b/baselines/common/misc_util.py @@ -67,14 +67,21 @@ class EzPickle(object): def set_global_seeds(i): + try: + import MPI + rank = MPI.COMM_WORLD.Get_rank() + except ImportError: + rank = 0 + + myseed = i + 1000 * rank if i is not None else None try: import tensorflow as tf except ImportError: pass else: - tf.set_random_seed(i) - np.random.seed(i) - random.seed(i) + tf.set_random_seed(myseed) + np.random.seed(myseed) + random.seed(myseed) def pretty_eta(seconds_left): diff --git a/baselines/common/runners.py b/baselines/common/runners.py index 0a4b221..c30e322 100644 --- a/baselines/common/runners.py +++ b/baselines/common/runners.py @@ -5,7 +5,7 @@ class AbstractEnvRunner(ABC): def __init__(self, *, env, model, nsteps): self.env = env self.model = model - nenv = env.num_envs + self.nenv = nenv = env.num_envs if hasattr(env, 'num_envs') else 1 self.batch_ob_shape = (nenv*nsteps,) + env.observation_space.shape self.obs = np.zeros((nenv,) + env.observation_space.shape, dtype=env.observation_space.dtype.name) self.obs[:] = env.reset() @@ -16,3 +16,4 @@ class AbstractEnvRunner(ABC): @abstractmethod def run(self): raise NotImplementedError + diff --git a/baselines/common/test_identity.py b/baselines/common/test_identity.py deleted file mode 100644 index a429e0c..0000000 --- a/baselines/common/test_identity.py +++ /dev/null @@ -1,44 +0,0 @@ -import pytest -import tensorflow as tf -import random -import numpy as np -from gym.spaces import np_random - -from baselines.a2c import a2c -from baselines.ppo2 import ppo2 -from baselines.common.identity_env import IdentityEnv -from baselines.common.vec_env.dummy_vec_env import DummyVecEnv -from baselines.ppo2.policies import MlpPolicy - - -learn_func_list = [ - lambda e: a2c.learn(policy=MlpPolicy, env=e, seed=0, total_timesteps=50000), - lambda e: ppo2.learn(policy=MlpPolicy, env=e, total_timesteps=50000, lr=1e-3, nsteps=128, ent_coef=0.01) -] - - -@pytest.mark.slow -@pytest.mark.parametrize("learn_func", learn_func_list) -def test_identity(learn_func): - ''' - Test if the algorithm (with a given policy) - can learn an identity transformation (i.e. return observation as an action) - ''' - np.random.seed(0) - np_random.seed(0) - random.seed(0) - - env = DummyVecEnv([lambda: IdentityEnv(10)]) - - with tf.Graph().as_default(), tf.Session().as_default(): - tf.set_random_seed(0) - model = learn_func(env) - - N_TRIALS = 1000 - sum_rew = 0 - obs = env.reset() - for i in range(N_TRIALS): - obs, rew, done, _ = env.step(model.step(obs)[0]) - sum_rew += rew - - assert sum_rew > 0.9 * N_TRIALS diff --git a/baselines/common/tf_util.py b/baselines/common/tf_util.py index afcd593..dd57d6e 100644 --- a/baselines/common/tf_util.py +++ b/baselines/common/tf_util.py @@ -48,17 +48,28 @@ def huber_loss(x, delta=1.0): # Global session # ================================================================ -def make_session(num_cpu=None, make_default=False, graph=None): +def get_session(config=None): + """Get default session or create one with a given config""" + sess = tf.get_default_session() + if sess is None: + sess = make_session(config=config, make_default=True) + return sess + +def make_session(config=None, num_cpu=None, make_default=False, graph=None): """Returns a session that will use CPU's only""" if num_cpu is None: num_cpu = int(os.getenv('RCALL_NUM_CPU', multiprocessing.cpu_count())) - tf_config = tf.ConfigProto( - inter_op_parallelism_threads=num_cpu, - intra_op_parallelism_threads=num_cpu) + if config is None: + config = tf.ConfigProto( + allow_soft_placement=True, + inter_op_parallelism_threads=num_cpu, + intra_op_parallelism_threads=num_cpu) + config.gpu_options.allow_growth = True + if make_default: - return tf.InteractiveSession(config=tf_config, graph=graph) + return tf.InteractiveSession(config=config, graph=graph) else: - return tf.Session(config=tf_config, graph=graph) + return tf.Session(config=config, graph=graph) def single_threaded_session(): """Returns a session which will only use a single CPU""" @@ -76,7 +87,7 @@ ALREADY_INITIALIZED = set() def initialize(): """Initialize all the uninitialized variables in the global scope.""" new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED - tf.get_default_session().run(tf.variables_initializer(new_variables)) + get_session().run(tf.variables_initializer(new_variables)) ALREADY_INITIALIZED.update(new_variables) # ================================================================ @@ -85,7 +96,7 @@ def initialize(): def normc_initializer(std=1.0, axis=0): def _initializer(shape, dtype=None, partition_info=None): # pylint: disable=W0613 - out = np.random.randn(*shape).astype(np.float32) + out = np.random.randn(*shape).astype(dtype.as_numpy_dtype) out *= std / np.sqrt(np.square(out).sum(axis=axis, keepdims=True)) return tf.constant(out) return _initializer @@ -179,7 +190,7 @@ class _Function(object): if hasattr(inpt, 'make_feed_dict'): feed_dict.update(inpt.make_feed_dict(value)) else: - feed_dict[inpt] = value + feed_dict[inpt] = adjust_shape(inpt, value) def __call__(self, *args): assert len(args) <= len(self.inputs), "Too many arguments provided" @@ -189,8 +200,8 @@ class _Function(object): self._feed_input(feed_dict, inpt, value) # Update feed dict with givens. for inpt in self.givens: - feed_dict[inpt] = feed_dict.get(inpt, self.givens[inpt]) - results = tf.get_default_session().run(self.outputs_update, feed_dict=feed_dict)[:-1] + feed_dict[inpt] = adjust_shape(inpt, feed_dict.get(inpt, self.givens[inpt])) + results = get_session().run(self.outputs_update, feed_dict=feed_dict)[:-1] return results # ================================================================ @@ -243,27 +254,34 @@ class GetFlat(object): def __call__(self): return tf.get_default_session().run(self.op) +def flattenallbut0(x): + return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])]) + +# ============================================================= +# TF placeholders management +# ============================================================ + _PLACEHOLDER_CACHE = {} # name -> (placeholder, dtype, shape) def get_placeholder(name, dtype, shape): if name in _PLACEHOLDER_CACHE: out, dtype1, shape1 = _PLACEHOLDER_CACHE[name] - assert dtype1 == dtype and shape1 == shape - return out - else: - out = tf.placeholder(dtype=dtype, shape=shape, name=name) - _PLACEHOLDER_CACHE[name] = (out, dtype, shape) - return out + if out.graph == tf.get_default_graph(): + assert dtype1 == dtype and shape1 == shape, \ + 'Placeholder with name {} has already been registered and has shape {}, different from requested {}'.format(name, shape1, shape) + return out + + out = tf.placeholder(dtype=dtype, shape=shape, name=name) + _PLACEHOLDER_CACHE[name] = (out, dtype, shape) + return out def get_placeholder_cached(name): return _PLACEHOLDER_CACHE[name][0] -def flattenallbut0(x): - return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])]) # ================================================================ -# Diagnostics +# Diagnostics # ================================================================ def display_var_info(vars): @@ -283,7 +301,7 @@ def display_var_info(vars): def get_available_gpus(): # recipe from here: # https://stackoverflow.com/questions/38559755/how-to-get-current-available-gpus-in-tensorflow?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa - + from tensorflow.python.client import device_lib local_device_protos = device_lib.list_local_devices() return [x.name for x in local_device_protos if x.device_type == 'GPU'] @@ -301,4 +319,61 @@ def save_state(fname): saver = tf.train.Saver() saver.save(tf.get_default_session(), fname) +# ================================================================ +# Shape adjustment for feeding into tf placeholders +# ================================================================ +def adjust_shape(placeholder, data): + ''' + adjust shape of the data to the shape of the placeholder if possible. + If shape is incompatible, AssertionError is thrown + + Parameters: + placeholder tensorflow input placeholder + + data input data to be (potentially) reshaped to be fed into placeholder + + Returns: + reshaped data + ''' + + if not isinstance(data, np.ndarray) and not isinstance(data, list): + return data + if isinstance(data, list): + data = np.array(data) + + placeholder_shape = [x or -1 for x in placeholder.shape.as_list()] + + assert _check_shape(placeholder_shape, data.shape), \ + 'Shape of data {} is not compatible with shape of the placeholder {}'.format(data.shape, placeholder_shape) + + return np.reshape(data, placeholder_shape) + + +def _check_shape(placeholder_shape, data_shape): + ''' check if two shapes are compatible (i.e. differ only by dimensions of size 1, or by the batch dimension)''' + + return True + squeezed_placeholder_shape = _squeeze_shape(placeholder_shape) + squeezed_data_shape = _squeeze_shape(data_shape) + + for i, s_data in enumerate(squeezed_data_shape): + s_placeholder = squeezed_placeholder_shape[i] + if s_placeholder != -1 and s_data != s_placeholder: + return False + + return True + + +def _squeeze_shape(shape): + return [x for x in shape if x != 1] + +# Tensorboard interfacing +# ================================================================ + +def launch_tensorboard_in_background(log_dir): + from tensorboard import main as tb + import threading + tf.flags.FLAGS.logdir = log_dir + t = threading.Thread(target=tb.main, args=([])) + t.start() diff --git a/baselines/common/vec_env/dummy_vec_env.py b/baselines/common/vec_env/dummy_vec_env.py index d0ae455..477bf30 100644 --- a/baselines/common/vec_env/dummy_vec_env.py +++ b/baselines/common/vec_env/dummy_vec_env.py @@ -30,15 +30,30 @@ class DummyVecEnv(VecEnv): self.actions = None def step_async(self, actions): - self.actions = actions + listify = True + try: + if len(actions) == self.num_envs: + listify = False + except TypeError: + pass + + if not listify: + self.actions = actions + else: + assert self.num_envs == 1, "actions {} is either not a list or has a wrong size - cannot match to {} environments".format(actions, self.num_envs) + self.actions = [actions] def step_wait(self): for e in range(self.num_envs): - obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(self.actions[e]) + action = self.actions[e] + if isinstance(self.envs[e].action_space, spaces.Discrete): + action = int(action) + + obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(action) if self.buf_dones[e]: obs = self.envs[e].reset() self._save_obs(e, obs) - return (self._obs_from_buf(), np.copy(self.buf_rews), np.copy(self.buf_dones), + return (np.copy(self._obs_from_buf()), np.copy(self.buf_rews), np.copy(self.buf_dones), self.buf_infos.copy()) def reset(self): diff --git a/baselines/common/vec_env/subproc_vec_env.py b/baselines/common/vec_env/subproc_vec_env.py index fb55df4..e5b5b32 100644 --- a/baselines/common/vec_env/subproc_vec_env.py +++ b/baselines/common/vec_env/subproc_vec_env.py @@ -7,26 +7,30 @@ from baselines.common.tile_images import tile_images def worker(remote, parent_remote, env_fn_wrapper): parent_remote.close() env = env_fn_wrapper.x() - while True: - cmd, data = remote.recv() - if cmd == 'step': - ob, reward, done, info = env.step(data) - if done: + try: + while True: + cmd, data = remote.recv() + if cmd == 'step': + ob, reward, done, info = env.step(data) + if done: + ob = env.reset() + remote.send((ob, reward, done, info)) + elif cmd == 'reset': ob = env.reset() - remote.send((ob, reward, done, info)) - elif cmd == 'reset': - ob = env.reset() - remote.send(ob) - elif cmd == 'render': - remote.send(env.render(mode='rgb_array')) - elif cmd == 'close': - remote.close() - break - elif cmd == 'get_spaces': - remote.send((env.observation_space, env.action_space)) - else: - raise NotImplementedError - + remote.send(ob) + elif cmd == 'render': + remote.send(env.render(mode='rgb_array')) + elif cmd == 'close': + remote.close() + break + elif cmd == 'get_spaces': + remote.send((env.observation_space, env.action_space)) + else: + raise NotImplementedError + except KeyboardInterrupt: + print('SubprocVecEnv worker: got KeyboardInterrupt') + finally: + env.close() class SubprocVecEnv(VecEnv): def __init__(self, env_fns, spaces=None): diff --git a/baselines/ddpg/ddpg.py b/baselines/ddpg/ddpg.py index e2d4950..6664cc4 100644 --- a/baselines/ddpg/ddpg.py +++ b/baselines/ddpg/ddpg.py @@ -26,9 +26,9 @@ def reduce_std(x, axis=None, keepdims=False): return tf.sqrt(reduce_var(x, axis=axis, keepdims=keepdims)) def reduce_var(x, axis=None, keepdims=False): - m = tf.reduce_mean(x, axis=axis, keep_dims=True) + m = tf.reduce_mean(x, axis=axis, keepdims=True) devs_squared = tf.square(x - m) - return tf.reduce_mean(devs_squared, axis=axis, keep_dims=keepdims) + return tf.reduce_mean(devs_squared, axis=axis, keepdims=keepdims) def get_target_updates(vars, target_vars, tau): logger.info('setting up target updates ...') diff --git a/baselines/deepq/__init__.py b/baselines/deepq/__init__.py index 4472399..6d2e168 100644 --- a/baselines/deepq/__init__.py +++ b/baselines/deepq/__init__.py @@ -1,8 +1,8 @@ from baselines.deepq import models # noqa from baselines.deepq.build_graph import build_act, build_train # noqa -from baselines.deepq.simple import learn, load # noqa +from baselines.deepq.deepq import learn, load # noqa from baselines.deepq.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer # noqa def wrap_atari_dqn(env): from baselines.common.atari_wrappers import wrap_deepmind - return wrap_deepmind(env, frame_stack=True, scale=True) \ No newline at end of file + return wrap_deepmind(env, frame_stack=True, scale=True) diff --git a/baselines/deepq/models.py b/baselines/deepq/models.py index 198d795..c41b707 100644 --- a/baselines/deepq/models.py +++ b/baselines/deepq/models.py @@ -89,3 +89,41 @@ def cnn_to_mlp(convs, hiddens, dueling=False, layer_norm=False): return lambda *args, **kwargs: _cnn_to_mlp(convs, hiddens, dueling, layer_norm=layer_norm, *args, **kwargs) + + +def build_q_func(network, hiddens=[256], dueling=True, layer_norm=False, **network_kwargs): + if isinstance(network, str): + from baselines.common.models import get_network_builder + network = get_network_builder(network)(**network_kwargs) + + def q_func_builder(input_placeholder, num_actions, scope, reuse=False): + with tf.variable_scope(scope, reuse=reuse): + latent, _ = network(input_placeholder) + latent = layers.flatten(latent) + + with tf.variable_scope("action_value"): + action_out = latent + for hidden in hiddens: + action_out = layers.fully_connected(action_out, num_outputs=hidden, activation_fn=None) + if layer_norm: + action_out = layers.layer_norm(action_out, center=True, scale=True) + action_out = tf.nn.relu(action_out) + action_scores = layers.fully_connected(action_out, num_outputs=num_actions, activation_fn=None) + + if dueling: + with tf.variable_scope("state_value"): + state_out = latent + for hidden in hiddens: + state_out = layers.fully_connected(state_out, num_outputs=hidden, activation_fn=None) + if layer_norm: + state_out = layers.layer_norm(state_out, center=True, scale=True) + state_out = tf.nn.relu(state_out) + state_score = layers.fully_connected(state_out, num_outputs=1, activation_fn=None) + action_scores_mean = tf.reduce_mean(action_scores, 1) + action_scores_centered = action_scores - tf.expand_dims(action_scores_mean, 1) + q_out = state_score + action_scores_centered + else: + q_out = action_scores + return q_out + + return q_func_builder diff --git a/baselines/deepq/simple.py b/baselines/deepq/simple.py deleted file mode 100644 index 4bad145..0000000 --- a/baselines/deepq/simple.py +++ /dev/null @@ -1,306 +0,0 @@ -import os -import tempfile - -import tensorflow as tf -import zipfile -import cloudpickle -import numpy as np - -import baselines.common.tf_util as U -from baselines.common.tf_util import load_state, save_state -from baselines import logger -from baselines.common.schedules import LinearSchedule -from baselines.common.input import observation_input - -from baselines import deepq -from baselines.deepq.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer -from baselines.deepq.utils import ObservationInput - - -class ActWrapper(object): - def __init__(self, act, act_params): - self._act = act - self._act_params = act_params - - @staticmethod - def load(path): - with open(path, "rb") as f: - model_data, act_params = cloudpickle.load(f) - act = deepq.build_act(**act_params) - sess = tf.Session() - sess.__enter__() - with tempfile.TemporaryDirectory() as td: - arc_path = os.path.join(td, "packed.zip") - with open(arc_path, "wb") as f: - f.write(model_data) - - zipfile.ZipFile(arc_path, 'r', zipfile.ZIP_DEFLATED).extractall(td) - load_state(os.path.join(td, "model")) - - return ActWrapper(act, act_params) - - def __call__(self, *args, **kwargs): - return self._act(*args, **kwargs) - - def save(self, path=None): - """Save model to a pickle located at `path`""" - if path is None: - path = os.path.join(logger.get_dir(), "model.pkl") - - with tempfile.TemporaryDirectory() as td: - save_state(os.path.join(td, "model")) - arc_name = os.path.join(td, "packed.zip") - with zipfile.ZipFile(arc_name, 'w') as zipf: - for root, dirs, files in os.walk(td): - for fname in files: - file_path = os.path.join(root, fname) - if file_path != arc_name: - zipf.write(file_path, os.path.relpath(file_path, td)) - with open(arc_name, "rb") as f: - model_data = f.read() - with open(path, "wb") as f: - cloudpickle.dump((model_data, self._act_params), f) - - -def load(path): - """Load act function that was returned by learn function. - - Parameters - ---------- - path: str - path to the act function pickle - - Returns - ------- - act: ActWrapper - function that takes a batch of observations - and returns actions. - """ - return ActWrapper.load(path) - - -def learn(env, - q_func, - lr=5e-4, - max_timesteps=100000, - buffer_size=50000, - exploration_fraction=0.1, - exploration_final_eps=0.02, - train_freq=1, - batch_size=32, - print_freq=100, - checkpoint_freq=10000, - checkpoint_path=None, - learning_starts=1000, - gamma=1.0, - target_network_update_freq=500, - prioritized_replay=False, - prioritized_replay_alpha=0.6, - prioritized_replay_beta0=0.4, - prioritized_replay_beta_iters=None, - prioritized_replay_eps=1e-6, - param_noise=False, - callback=None): - """Train a deepq model. - - Parameters - ------- - env: gym.Env - environment to train on - q_func: (tf.Variable, int, str, bool) -> tf.Variable - the model that takes the following inputs: - observation_in: object - the output of observation placeholder - num_actions: int - number of actions - scope: str - reuse: bool - should be passed to outer variable scope - and returns a tensor of shape (batch_size, num_actions) with values of every action. - lr: float - learning rate for adam optimizer - max_timesteps: int - number of env steps to optimizer for - buffer_size: int - size of the replay buffer - exploration_fraction: float - fraction of entire training period over which the exploration rate is annealed - exploration_final_eps: float - final value of random action probability - train_freq: int - update the model every `train_freq` steps. - set to None to disable printing - batch_size: int - size of a batched sampled from replay buffer for training - print_freq: int - how often to print out training progress - set to None to disable printing - checkpoint_freq: int - how often to save the model. This is so that the best version is restored - at the end of the training. If you do not wish to restore the best version at - the end of the training set this variable to None. - learning_starts: int - how many steps of the model to collect transitions for before learning starts - gamma: float - discount factor - target_network_update_freq: int - update the target network every `target_network_update_freq` steps. - prioritized_replay: True - if True prioritized replay buffer will be used. - prioritized_replay_alpha: float - alpha parameter for prioritized replay buffer - prioritized_replay_beta0: float - initial value of beta for prioritized replay buffer - prioritized_replay_beta_iters: int - number of iterations over which beta will be annealed from initial value - to 1.0. If set to None equals to max_timesteps. - prioritized_replay_eps: float - epsilon to add to the TD errors when updating priorities. - callback: (locals, globals) -> None - function called at every steps with state of the algorithm. - If callback returns true training stops. - - Returns - ------- - act: ActWrapper - Wrapper over act function. Adds ability to save it and load it. - See header of baselines/deepq/categorical.py for details on the act function. - """ - # Create all the functions necessary to train the model - - sess = tf.Session() - sess.__enter__() - - # capture the shape outside the closure so that the env object is not serialized - # by cloudpickle when serializing make_obs_ph - - def make_obs_ph(name): - return ObservationInput(env.observation_space, name=name) - - act, train, update_target, debug = deepq.build_train( - make_obs_ph=make_obs_ph, - q_func=q_func, - num_actions=env.action_space.n, - optimizer=tf.train.AdamOptimizer(learning_rate=lr), - gamma=gamma, - grad_norm_clipping=10, - param_noise=param_noise - ) - - act_params = { - 'make_obs_ph': make_obs_ph, - 'q_func': q_func, - 'num_actions': env.action_space.n, - } - - act = ActWrapper(act, act_params) - - # Create the replay buffer - if prioritized_replay: - replay_buffer = PrioritizedReplayBuffer(buffer_size, alpha=prioritized_replay_alpha) - if prioritized_replay_beta_iters is None: - prioritized_replay_beta_iters = max_timesteps - beta_schedule = LinearSchedule(prioritized_replay_beta_iters, - initial_p=prioritized_replay_beta0, - final_p=1.0) - else: - replay_buffer = ReplayBuffer(buffer_size) - beta_schedule = None - # Create the schedule for exploration starting from 1. - exploration = LinearSchedule(schedule_timesteps=int(exploration_fraction * max_timesteps), - initial_p=1.0, - final_p=exploration_final_eps) - - # Initialize the parameters and copy them to the target network. - U.initialize() - update_target() - - episode_rewards = [0.0] - saved_mean_reward = None - obs = env.reset() - reset = True - - with tempfile.TemporaryDirectory() as td: - td = checkpoint_path or td - - model_file = os.path.join(td, "model") - model_saved = False - if tf.train.latest_checkpoint(td) is not None: - load_state(model_file) - logger.log('Loaded model from {}'.format(model_file)) - model_saved = True - - for t in range(max_timesteps): - if callback is not None: - if callback(locals(), globals()): - break - # Take action and update exploration to the newest value - kwargs = {} - if not param_noise: - update_eps = exploration.value(t) - update_param_noise_threshold = 0. - else: - update_eps = 0. - # Compute the threshold such that the KL divergence between perturbed and non-perturbed - # policy is comparable to eps-greedy exploration with eps = exploration.value(t). - # See Appendix C.1 in Parameter Space Noise for Exploration, Plappert et al., 2017 - # for detailed explanation. - update_param_noise_threshold = -np.log(1. - exploration.value(t) + exploration.value(t) / float(env.action_space.n)) - kwargs['reset'] = reset - kwargs['update_param_noise_threshold'] = update_param_noise_threshold - kwargs['update_param_noise_scale'] = True - action = act(np.array(obs)[None], update_eps=update_eps, **kwargs)[0] - env_action = action - reset = False - new_obs, rew, done, _ = env.step(env_action) - # Store transition in the replay buffer. - replay_buffer.add(obs, action, rew, new_obs, float(done)) - obs = new_obs - - episode_rewards[-1] += rew - if done: - obs = env.reset() - episode_rewards.append(0.0) - reset = True - - if t > learning_starts and t % train_freq == 0: - # Minimize the error in Bellman's equation on a batch sampled from replay buffer. - if prioritized_replay: - experience = replay_buffer.sample(batch_size, beta=beta_schedule.value(t)) - (obses_t, actions, rewards, obses_tp1, dones, weights, batch_idxes) = experience - else: - obses_t, actions, rewards, obses_tp1, dones = replay_buffer.sample(batch_size) - weights, batch_idxes = np.ones_like(rewards), None - td_errors = train(obses_t, actions, rewards, obses_tp1, dones, weights) - if prioritized_replay: - new_priorities = np.abs(td_errors) + prioritized_replay_eps - replay_buffer.update_priorities(batch_idxes, new_priorities) - - if t > learning_starts and t % target_network_update_freq == 0: - # Update target network periodically. - update_target() - - mean_100ep_reward = round(np.mean(episode_rewards[-101:-1]), 1) - num_episodes = len(episode_rewards) - if done and print_freq is not None and len(episode_rewards) % print_freq == 0: - logger.record_tabular("steps", t) - logger.record_tabular("episodes", num_episodes) - logger.record_tabular("mean 100 episode reward", mean_100ep_reward) - logger.record_tabular("% time spent exploring", int(100 * exploration.value(t))) - logger.dump_tabular() - - if (checkpoint_freq is not None and t > learning_starts and - num_episodes > 100 and t % checkpoint_freq == 0): - if saved_mean_reward is None or mean_100ep_reward > saved_mean_reward: - if print_freq is not None: - logger.log("Saving model due to mean reward increase: {} -> {}".format( - saved_mean_reward, mean_100ep_reward)) - save_state(model_file) - model_saved = True - saved_mean_reward = mean_100ep_reward - if model_saved: - if print_freq is not None: - logger.log("Restored model with mean reward: {}".format(saved_mean_reward)) - load_state(model_file) - - return act diff --git a/baselines/deepq/test_identity.py b/baselines/deepq/test_identity.py deleted file mode 100644 index ef57e70..0000000 --- a/baselines/deepq/test_identity.py +++ /dev/null @@ -1,43 +0,0 @@ -import tensorflow as tf -import random - -from baselines import deepq -from baselines.common.identity_env import IdentityEnv - - -def test_identity(): - - with tf.Graph().as_default(): - env = IdentityEnv(10) - random.seed(0) - - tf.set_random_seed(0) - - param_noise = False - model = deepq.models.mlp([32]) - act = deepq.learn( - env, - q_func=model, - lr=1e-3, - max_timesteps=10000, - buffer_size=50000, - exploration_fraction=0.1, - exploration_final_eps=0.02, - print_freq=10, - param_noise=param_noise, - ) - - tf.set_random_seed(0) - - N_TRIALS = 1000 - sum_rew = 0 - obs = env.reset() - for i in range(N_TRIALS): - obs, rew, done, _ = env.step(act([obs])) - sum_rew += rew - - assert sum_rew > 0.9 * N_TRIALS - - -if __name__ == '__main__': - test_identity() diff --git a/baselines/deepq/utils.py b/baselines/deepq/utils.py index 90b932e..2914f43 100644 --- a/baselines/deepq/utils.py +++ b/baselines/deepq/utils.py @@ -1,4 +1,5 @@ from baselines.common.input import observation_input +from baselines.common.tf_util import adjust_shape import tensorflow as tf @@ -36,7 +37,7 @@ class PlaceholderTfInput(TfInput): return self._placeholder def make_feed_dict(self, data): - return {self._placeholder: data} + return {self._placeholder: adjust_shape(self._placeholder, data)} class Uint8Input(PlaceholderTfInput): diff --git a/baselines/ppo1/run_atari.py b/baselines/ppo1/run_atari.py index 17941c6..96e3482 100644 --- a/baselines/ppo1/run_atari.py +++ b/baselines/ppo1/run_atari.py @@ -18,7 +18,7 @@ def train(env_id, num_timesteps, seed): logger.configure() else: logger.configure(format_strs=[]) - workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank() + workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank() if seed is not None else None set_global_seeds(workerseed) env = make_atari(env_id) def policy_fn(name, ob_space, ac_space): #pylint: disable=W0613 diff --git a/baselines/ppo2/policies.py b/baselines/ppo2/policies.py deleted file mode 100644 index 6fbbb14..0000000 --- a/baselines/ppo2/policies.py +++ /dev/null @@ -1,146 +0,0 @@ -import numpy as np -import tensorflow as tf -from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch, lstm, lnlstm -from baselines.common.distributions import make_pdtype -from baselines.common.input import observation_input - -def nature_cnn(unscaled_images, **conv_kwargs): - """ - CNN from Nature paper. - """ - scaled_images = tf.cast(unscaled_images, tf.float32) / 255. - activ = tf.nn.relu - h = activ(conv(scaled_images, 'c1', nf=32, rf=8, stride=4, init_scale=np.sqrt(2), - **conv_kwargs)) - h2 = activ(conv(h, 'c2', nf=64, rf=4, stride=2, init_scale=np.sqrt(2), **conv_kwargs)) - h3 = activ(conv(h2, 'c3', nf=64, rf=3, stride=1, init_scale=np.sqrt(2), **conv_kwargs)) - h3 = conv_to_fc(h3) - return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) - -class LnLstmPolicy(object): - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, nlstm=256, reuse=False): - nenv = nbatch // nsteps - X, processed_x = observation_input(ob_space, nbatch) - M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) - S = tf.placeholder(tf.float32, [nenv, nlstm*2]) #states - self.pdtype = make_pdtype(ac_space) - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(processed_x) - xs = batch_to_seq(h, nenv, nsteps) - ms = batch_to_seq(M, nenv, nsteps) - h5, snew = lnlstm(xs, ms, S, 'lstm1', nh=nlstm) - h5 = seq_to_batch(h5) - vf = fc(h5, 'v', 1) - self.pd, self.pi = self.pdtype.pdfromlatent(h5) - - v0 = vf[:, 0] - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = np.zeros((nenv, nlstm*2), dtype=np.float32) - - def step(ob, state, mask): - return sess.run([a0, v0, snew, neglogp0], {X:ob, S:state, M:mask}) - - def value(ob, state, mask): - return sess.run(v0, {X:ob, S:state, M:mask}) - - self.X = X - self.M = M - self.S = S - self.vf = vf - self.step = step - self.value = value - -class LstmPolicy(object): - - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, nlstm=256, reuse=False): - nenv = nbatch // nsteps - self.pdtype = make_pdtype(ac_space) - X, processed_x = observation_input(ob_space, nbatch) - - M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) - S = tf.placeholder(tf.float32, [nenv, nlstm*2]) #states - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(X) - xs = batch_to_seq(h, nenv, nsteps) - ms = batch_to_seq(M, nenv, nsteps) - h5, snew = lstm(xs, ms, S, 'lstm1', nh=nlstm) - h5 = seq_to_batch(h5) - vf = fc(h5, 'v', 1) - self.pd, self.pi = self.pdtype.pdfromlatent(h5) - - v0 = vf[:, 0] - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = np.zeros((nenv, nlstm*2), dtype=np.float32) - - def step(ob, state, mask): - return sess.run([a0, v0, snew, neglogp0], {X:ob, S:state, M:mask}) - - def value(ob, state, mask): - return sess.run(v0, {X:ob, S:state, M:mask}) - - self.X = X - self.M = M - self.S = S - self.vf = vf - self.step = step - self.value = value - -class CnnPolicy(object): - - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, reuse=False, **conv_kwargs): #pylint: disable=W0613 - self.pdtype = make_pdtype(ac_space) - X, processed_x = observation_input(ob_space, nbatch) - with tf.variable_scope("model", reuse=reuse): - h = nature_cnn(processed_x, **conv_kwargs) - vf = fc(h, 'v', 1)[:,0] - self.pd, self.pi = self.pdtype.pdfromlatent(h, init_scale=0.01) - - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = None - - def step(ob, *_args, **_kwargs): - a, v, neglogp = sess.run([a0, vf, neglogp0], {X:ob}) - return a, v, self.initial_state, neglogp - - def value(ob, *_args, **_kwargs): - return sess.run(vf, {X:ob}) - - self.X = X - self.vf = vf - self.step = step - self.value = value - -class MlpPolicy(object): - def __init__(self, sess, ob_space, ac_space, nbatch, nsteps, reuse=False): #pylint: disable=W0613 - self.pdtype = make_pdtype(ac_space) - with tf.variable_scope("model", reuse=reuse): - X, processed_x = observation_input(ob_space, nbatch) - activ = tf.tanh - processed_x = tf.layers.flatten(processed_x) - pi_h1 = activ(fc(processed_x, 'pi_fc1', nh=64, init_scale=np.sqrt(2))) - pi_h2 = activ(fc(pi_h1, 'pi_fc2', nh=64, init_scale=np.sqrt(2))) - vf_h1 = activ(fc(processed_x, 'vf_fc1', nh=64, init_scale=np.sqrt(2))) - vf_h2 = activ(fc(vf_h1, 'vf_fc2', nh=64, init_scale=np.sqrt(2))) - vf = fc(vf_h2, 'vf', 1)[:,0] - - self.pd, self.pi = self.pdtype.pdfromlatent(pi_h2, init_scale=0.01) - - - a0 = self.pd.sample() - neglogp0 = self.pd.neglogp(a0) - self.initial_state = None - - def step(ob, *_args, **_kwargs): - a, v, neglogp = sess.run([a0, vf, neglogp0], {X:ob}) - return a, v, self.initial_state, neglogp - - def value(ob, *_args, **_kwargs): - return sess.run(vf, {X:ob}) - - self.X = X - self.vf = vf - self.step = step - self.value = value diff --git a/baselines/ppo2/ppo2.py b/baselines/ppo2/ppo2.py index fd34f52..ae3a77f 100644 --- a/baselines/ppo2/ppo2.py +++ b/baselines/ppo2/ppo2.py @@ -6,16 +6,24 @@ import os.path as osp import tensorflow as tf from baselines import logger from collections import deque -from baselines.common import explained_variance +from baselines.common import explained_variance, set_global_seeds +from baselines.common.policies import build_policy from baselines.common.runners import AbstractEnvRunner +from baselines.common.tf_util import get_session +from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer + +from mpi4py import MPI +from baselines.common.tf_util import initialize +from baselines.common.mpi_util import sync_from_root class Model(object): def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train, nsteps, ent_coef, vf_coef, max_grad_norm): - sess = tf.get_default_session() + sess = get_session() - act_model = policy(sess, ob_space, ac_space, nbatch_act, 1, reuse=False) - train_model = policy(sess, ob_space, ac_space, nbatch_train, nsteps, reuse=True) + with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE): + act_model = policy(nbatch_act, 1, sess) + train_model = policy(nbatch_train, nsteps, sess) A = train_model.pdtype.sample_placeholder([None]) ADV = tf.placeholder(tf.float32, [None]) @@ -40,14 +48,16 @@ class Model(object): approxkl = .5 * tf.reduce_mean(tf.square(neglogpac - OLDNEGLOGPAC)) clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE))) loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef - with tf.variable_scope('model'): - params = tf.trainable_variables() - grads = tf.gradients(loss, params) + params = tf.trainable_variables('ppo2_model') + trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) + grads_and_var = trainer.compute_gradients(loss, params) + grads, var = zip(*grads_and_var) + if max_grad_norm is not None: grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm) - grads = list(zip(grads, params)) - trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5) - _train = trainer.apply_gradients(grads) + grads_and_var = list(zip(grads, var)) + + _train = trainer.apply_gradients(grads_and_var) def train(lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None): advs = returns - values @@ -83,7 +93,11 @@ class Model(object): self.initial_state = act_model.initial_state self.save = save self.load = load - tf.global_variables_initializer().run(session=sess) #pylint: disable=E1101 + + if MPI.COMM_WORLD.Get_rank() == 0: + initialize() + global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") + sync_from_root(sess, global_variables) #pylint: disable=E1101 class Runner(AbstractEnvRunner): @@ -97,7 +111,7 @@ class Runner(AbstractEnvRunner): mb_states = self.states epinfos = [] for _ in range(self.nsteps): - actions, values, self.states, neglogpacs = self.model.step(self.obs, self.states, self.dones) + actions, values, self.states, neglogpacs = self.model.step(self.obs, S=self.states, M=self.dones) mb_obs.append(self.obs.copy()) mb_actions.append(actions) mb_values.append(values) @@ -115,7 +129,7 @@ class Runner(AbstractEnvRunner): mb_values = np.asarray(mb_values, dtype=np.float32) mb_neglogpacs = np.asarray(mb_neglogpacs, dtype=np.float32) mb_dones = np.asarray(mb_dones, dtype=np.bool) - last_values = self.model.value(self.obs, self.states, self.dones) + last_values = self.model.value(self.obs, S=self.states, M=self.dones) #discount/bootstrap off value fn mb_returns = np.zeros_like(mb_rewards) mb_advs = np.zeros_like(mb_rewards) @@ -145,10 +159,65 @@ def constfn(val): return val return f -def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, +def learn(*, network, env, total_timesteps, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4, vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2, - save_interval=0, load_path=None): + save_interval=0, load_path=None, **network_kwargs): + ''' + Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347) + + Parameters: + ---------- + + network: policy network architecture. Either string (mlp, lstm, lnlstm, cnn_lstm, cnn, cnn_small, conv_only - see baselines.common/models.py for full list) + specifying the standard network architecture, or a function that takes tensorflow tensor as input and returns + tuple (output_tensor, extra_feed) where output tensor is the last network layer output, extra_feed is None for feed-forward + neural nets, and extra_feed is a dictionary describing how to feed state into the network for recurrent neural nets. + See baselines.common/policies.py/lstm for more details on using recurrent nets in policies + + env: baselines.common.vec_env.VecEnv environment. Needs to be vectorized for parallel environment simulation. + The environments produced by gym.make can be wrapped using baselines.common.vec_env.DummyVecEnv class. + + + nsteps: int number of steps of the vectorized environment per update (i.e. batch size is nsteps * nenv where + nenv is number of environment copies simulated in parallel) + + total_timesteps: int number of timesteps (i.e. number of actions taken in the environment) + + ent_coef: float policy entropy coefficient in the optimization objective + + lr: float or function learning rate, constant or a schedule function [0,1] -> R+ where 1 is beginning of the + training and 0 is the end of the training. + + vf_coef: float value function loss coefficient in the optimization objective + + max_grad_norm: float or None gradient norm clipping coefficient + + gamma: float discounting factor + + lam: float advantage estimation discounting factor (lambda in the paper) + + log_interval: int number of timesteps between logging events + + nminibatches: int number of training minibatches per update + + noptepochs: int number of training epochs per update + + cliprange: float or function clipping range, constant or schedule function [0,1] -> R+ where 1 is beginning of the training + and 0 is the end of the training + + save_interval: int number of timesteps between saving events + + load_path: str path to load the model from + + **network_kwargs: keyword arguments to the policy / network builder. See baselines.common/policies.py/build_policy and arguments to a particular type of network + For instance, 'mlp' network architecture has arguments num_hidden and num_layers. + + + + ''' + + set_global_seeds(seed) if isinstance(lr, float): lr = constfn(lr) else: assert callable(lr) @@ -156,6 +225,8 @@ def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, else: assert callable(cliprange) total_timesteps = int(total_timesteps) + policy = build_policy(env, network, **network_kwargs) + nenvs = env.num_envs ob_space = env.observation_space ac_space = env.action_space @@ -180,7 +251,6 @@ def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, nupdates = total_timesteps//nbatch for update in range(1, nupdates+1): assert nbatch % nminibatches == 0 - nbatch_train = nbatch // nminibatches tstart = time.time() frac = 1.0 - (update - 1.0) / nupdates lrnow = lr(frac) @@ -228,8 +298,9 @@ def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, logger.logkv('time_elapsed', tnow - tfirststart) for (lossval, lossname) in zip(lossvals, model.loss_names): logger.logkv(lossname, lossval) - logger.dumpkvs() - if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir(): + if MPI.COMM_WORLD.Get_rank() == 0: + logger.dumpkvs() + if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and MPI.COMM_WORLD.Get_rank() == 0: checkdir = osp.join(logger.get_dir(), 'checkpoints') os.makedirs(checkdir, exist_ok=True) savepath = osp.join(checkdir, '%.5i'%update) @@ -240,3 +311,6 @@ def learn(*, policy, env, nsteps, total_timesteps, ent_coef, lr, def safemean(xs): return np.nan if len(xs) == 0 else np.mean(xs) + + + diff --git a/baselines/ppo2/run_atari.py b/baselines/ppo2/run_atari.py deleted file mode 100644 index 322837a..0000000 --- a/baselines/ppo2/run_atari.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -import sys -from baselines import logger -from baselines.common.cmd_util import make_atari_env, atari_arg_parser -from baselines.common.vec_env.vec_frame_stack import VecFrameStack -from baselines.ppo2 import ppo2 -from baselines.ppo2.policies import CnnPolicy, LstmPolicy, LnLstmPolicy, MlpPolicy -import multiprocessing -import tensorflow as tf - - -def train(env_id, num_timesteps, seed, policy): - - ncpu = multiprocessing.cpu_count() - if sys.platform == 'darwin': ncpu //= 2 - config = tf.ConfigProto(allow_soft_placement=True, - intra_op_parallelism_threads=ncpu, - inter_op_parallelism_threads=ncpu) - config.gpu_options.allow_growth = True #pylint: disable=E1101 - tf.Session(config=config).__enter__() - - env = VecFrameStack(make_atari_env(env_id, 8, seed), 4) - policy = {'cnn' : CnnPolicy, 'lstm' : LstmPolicy, 'lnlstm' : LnLstmPolicy, 'mlp': MlpPolicy}[policy] - ppo2.learn(policy=policy, env=env, nsteps=128, nminibatches=4, - lam=0.95, gamma=0.99, noptepochs=4, log_interval=1, - ent_coef=.01, - lr=lambda f : f * 2.5e-4, - cliprange=lambda f : f * 0.1, - total_timesteps=int(num_timesteps * 1.1)) - -def main(): - parser = atari_arg_parser() - parser.add_argument('--policy', help='Policy architecture', choices=['cnn', 'lstm', 'lnlstm', 'mlp'], default='cnn') - args = parser.parse_args() - logger.configure() - train(args.env, num_timesteps=args.num_timesteps, seed=args.seed, - policy=args.policy) - -if __name__ == '__main__': - main() diff --git a/baselines/ppo2/run_mujoco.py b/baselines/ppo2/run_mujoco.py deleted file mode 100644 index 282aa3f..0000000 --- a/baselines/ppo2/run_mujoco.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python3 -import numpy as np -from baselines.common.cmd_util import mujoco_arg_parser -from baselines import bench, logger - - -def train(env_id, num_timesteps, seed): - from baselines.common import set_global_seeds - from baselines.common.vec_env.vec_normalize import VecNormalize - from baselines.ppo2 import ppo2 - from baselines.ppo2.policies import MlpPolicy - import gym - import tensorflow as tf - from baselines.common.vec_env.dummy_vec_env import DummyVecEnv - ncpu = 1 - config = tf.ConfigProto(allow_soft_placement=True, - intra_op_parallelism_threads=ncpu, - inter_op_parallelism_threads=ncpu) - tf.Session(config=config).__enter__() - - def make_env(): - env = gym.make(env_id) - env = bench.Monitor(env, logger.get_dir(), allow_early_resets=True) - return env - - env = DummyVecEnv([make_env]) - env = VecNormalize(env) - - set_global_seeds(seed) - policy = MlpPolicy - model = ppo2.learn(policy=policy, env=env, nsteps=2048, nminibatches=32, - lam=0.95, gamma=0.99, noptepochs=10, log_interval=1, - ent_coef=0.0, - lr=3e-4, - cliprange=0.2, - total_timesteps=num_timesteps) - - return model, env - - -def main(): - args = mujoco_arg_parser().parse_args() - logger.configure() - model, env = train(args.env, num_timesteps=args.num_timesteps, seed=args.seed) - - if args.play: - logger.log("Running trained model") - obs = np.zeros((env.num_envs,) + env.observation_space.shape) - obs[:] = env.reset() - while True: - actions = model.step(obs)[0] - obs[:] = env.step(actions)[0] - env.render() - - -if __name__ == '__main__': - main() diff --git a/baselines/trpo_mpi/nosharing_cnn_policy.py b/baselines/trpo_mpi/nosharing_cnn_policy.py deleted file mode 100644 index 97b2dcd..0000000 --- a/baselines/trpo_mpi/nosharing_cnn_policy.py +++ /dev/null @@ -1,56 +0,0 @@ -import baselines.common.tf_util as U -import tensorflow as tf -import gym -from baselines.common.distributions import make_pdtype - -class CnnPolicy(object): - recurrent = False - def __init__(self, name, ob_space, ac_space): - with tf.variable_scope(name): - self._init(ob_space, ac_space) - self.scope = tf.get_variable_scope().name - - def _init(self, ob_space, ac_space): - assert isinstance(ob_space, gym.spaces.Box) - - self.pdtype = pdtype = make_pdtype(ac_space) - sequence_length = None - - ob = U.get_placeholder(name="ob", dtype=tf.float32, shape=[sequence_length] + list(ob_space.shape)) - - obscaled = ob / 255.0 - - with tf.variable_scope("pol"): - x = obscaled - x = tf.nn.relu(U.conv2d(x, 8, "l1", [8, 8], [4, 4], pad="VALID")) - x = tf.nn.relu(U.conv2d(x, 16, "l2", [4, 4], [2, 2], pad="VALID")) - x = U.flattenallbut0(x) - x = tf.nn.relu(tf.layers.dense(x, 128, name='lin', kernel_initializer=U.normc_initializer(1.0))) - logits = tf.layers.dense(x, pdtype.param_shape()[0], name='logits', kernel_initializer=U.normc_initializer(0.01)) - self.pd = pdtype.pdfromflat(logits) - with tf.variable_scope("vf"): - x = obscaled - x = tf.nn.relu(U.conv2d(x, 8, "l1", [8, 8], [4, 4], pad="VALID")) - x = tf.nn.relu(U.conv2d(x, 16, "l2", [4, 4], [2, 2], pad="VALID")) - x = U.flattenallbut0(x) - x = tf.nn.relu(tf.layers.dense(x, 128, name='lin', kernel_initializer=U.normc_initializer(1.0))) - self.vpred = tf.layers.dense(x, 1, name='value', kernel_initializer=U.normc_initializer(1.0)) - self.vpredz = self.vpred - - self.state_in = [] - self.state_out = [] - - stochastic = tf.placeholder(dtype=tf.bool, shape=()) - ac = self.pd.sample() - self._act = U.function([stochastic, ob], [ac, self.vpred]) - - def act(self, stochastic, ob): - ac1, vpred1 = self._act(stochastic, ob[None]) - return ac1[0], vpred1[0] - def get_variables(self): - return tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, self.scope) - def get_trainable_variables(self): - return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, self.scope) - def get_initial_state(self): - return [] - diff --git a/baselines/trpo_mpi/run_atari.py b/baselines/trpo_mpi/run_atari.py deleted file mode 100644 index f31ebfd..0000000 --- a/baselines/trpo_mpi/run_atari.py +++ /dev/null @@ -1,43 +0,0 @@ - #!/usr/bin/env python3 -from mpi4py import MPI -from baselines.common import set_global_seeds -import os.path as osp -import gym, logging -from baselines import logger -from baselines import bench -from baselines.common.atari_wrappers import make_atari, wrap_deepmind -from baselines.common.cmd_util import atari_arg_parser - -def train(env_id, num_timesteps, seed): - from baselines.trpo_mpi.nosharing_cnn_policy import CnnPolicy - from baselines.trpo_mpi import trpo_mpi - import baselines.common.tf_util as U - rank = MPI.COMM_WORLD.Get_rank() - sess = U.single_threaded_session() - sess.__enter__() - if rank == 0: - logger.configure() - else: - logger.configure(format_strs=[]) - - workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank() - set_global_seeds(workerseed) - env = make_atari(env_id) - def policy_fn(name, ob_space, ac_space): #pylint: disable=W0613 - return CnnPolicy(name=name, ob_space=env.observation_space, ac_space=env.action_space) - env = bench.Monitor(env, logger.get_dir() and osp.join(logger.get_dir(), str(rank))) - env.seed(workerseed) - - env = wrap_deepmind(env) - env.seed(workerseed) - - trpo_mpi.learn(env, policy_fn, timesteps_per_batch=512, max_kl=0.001, cg_iters=10, cg_damping=1e-3, - max_timesteps=int(num_timesteps * 1.1), gamma=0.98, lam=1.0, vf_iters=3, vf_stepsize=1e-4, entcoeff=0.00) - env.close() - -def main(): - args = atari_arg_parser().parse_args() - train(args.env, num_timesteps=args.num_timesteps, seed=args.seed) - -if __name__ == "__main__": - main() diff --git a/baselines/trpo_mpi/run_mujoco.py b/baselines/trpo_mpi/run_mujoco.py deleted file mode 100644 index 220bb91..0000000 --- a/baselines/trpo_mpi/run_mujoco.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 -# noinspection PyUnresolvedReferences -from mpi4py import MPI -from baselines.common.cmd_util import make_mujoco_env, mujoco_arg_parser -from baselines import logger -from baselines.ppo1.mlp_policy import MlpPolicy -from baselines.trpo_mpi import trpo_mpi - -def train(env_id, num_timesteps, seed): - import baselines.common.tf_util as U - sess = U.single_threaded_session() - sess.__enter__() - - rank = MPI.COMM_WORLD.Get_rank() - if rank == 0: - logger.configure() - else: - logger.configure(format_strs=[]) - logger.set_level(logger.DISABLED) - workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank() - def policy_fn(name, ob_space, ac_space): - return MlpPolicy(name=name, ob_space=ob_space, ac_space=ac_space, - hid_size=32, num_hid_layers=2) - env = make_mujoco_env(env_id, workerseed) - trpo_mpi.learn(env, policy_fn, timesteps_per_batch=1024, max_kl=0.01, cg_iters=10, cg_damping=0.1, - max_timesteps=num_timesteps, gamma=0.99, lam=0.98, vf_iters=5, vf_stepsize=1e-3) - env.close() - -def main(): - args = mujoco_arg_parser().parse_args() - train(args.env, num_timesteps=args.num_timesteps, seed=args.seed) - - -if __name__ == '__main__': - main() - diff --git a/baselines/trpo_mpi/trpo_mpi.py b/baselines/trpo_mpi/trpo_mpi.py index e23d9ac..96e7809 100644 --- a/baselines/trpo_mpi/trpo_mpi.py +++ b/baselines/trpo_mpi/trpo_mpi.py @@ -6,8 +6,11 @@ import time from baselines.common import colorize from mpi4py import MPI from collections import deque +from baselines.common import set_global_seeds from baselines.common.mpi_adam import MpiAdam from baselines.common.cg import cg +from baselines.common.input import observation_placeholder +from baselines.common.policies import build_policy from contextlib import contextmanager def traj_segment_generator(pi, env, horizon, stochastic): @@ -33,7 +36,7 @@ def traj_segment_generator(pi, env, horizon, stochastic): while True: prevac = ac - ac, vpred = pi.act(stochastic, ob) + ac, vpred, _, _ = pi.step(ob, stochastic=stochastic) # Slight weirdness here because we need value function at time T # before returning segment [0, T-1] so we get the correct # terminal value @@ -41,7 +44,7 @@ def traj_segment_generator(pi, env, horizon, stochastic): yield {"ob" : obs, "rew" : rews, "vpred" : vpreds, "new" : news, "ac" : acs, "prevac" : prevacs, "nextvpred": vpred * (1 - new), "ep_rets" : ep_rets, "ep_lens" : ep_lens} - _, vpred = pi.act(stochastic, ob) + _, vpred, _, _ = pi.step(ob, stochastic=stochastic) # Be careful!!! if you change the downstream algorithm to aggregate # several of these batches, then be sure to do a deepcopy ep_rets = [] @@ -79,30 +82,95 @@ def add_vtarg_and_adv(seg, gamma, lam): gaelam[t] = lastgaelam = delta + gamma * lam * nonterminal * lastgaelam seg["tdlamret"] = seg["adv"] + seg["vpred"] -def learn(env, policy_fn, *, - timesteps_per_batch, # what to train on - max_kl, cg_iters, - gamma, lam, # advantage estimation +def learn(*, + network, + env, + total_timesteps, + timesteps_per_batch=1024, # what to train on + max_kl=0.001, + cg_iters=10, + gamma=0.99, + lam=1.0, # advantage estimation + seed=None, entcoeff=0.0, cg_damping=1e-2, vf_stepsize=3e-4, vf_iters =3, - max_timesteps=0, max_episodes=0, max_iters=0, # time constraint - callback=None + max_episodes=0, max_iters=0, # time constraint + callback=None, + **network_kwargs ): + ''' + learn a policy function with TRPO algorithm + + Parameters: + ---------- + + network neural network to learn. Can be either string ('mlp', 'cnn', 'lstm', 'lnlstm' for basic types) + or function that takes input placeholder and returns tuple (output, None) for feedforward nets + or (output, (state_placeholder, state_output, mask_placeholder)) for recurrent nets + + env environment (one of the gym environments or wrapped via baselines.common.vec_env.VecEnv-type class + + timesteps_per_batch timesteps per gradient estimation batch + + max_kl max KL divergence between old policy and new policy ( KL(pi_old || pi) ) + + entcoeff coefficient of policy entropy term in the optimization objective + + cg_iters number of iterations of conjugate gradient algorithm + + cg_damping conjugate gradient damping + + vf_stepsize learning rate for adam optimizer used to optimie value function loss + + vf_iters number of iterations of value function optimization iterations per each policy optimization step + + total_timesteps max number of timesteps + + max_episodes max number of episodes + + max_iters maximum number of policy optimization iterations + + callback function to be called with (locals(), globals()) each policy optimization step + + Returns: + ------- + + learnt model + + ''' + + nworkers = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() + + cpus_per_worker = 1 + U.get_session(config=tf.ConfigProto( + allow_soft_placement=True, + inter_op_parallelism_threads=cpus_per_worker, + intra_op_parallelism_threads=cpus_per_worker + )) + + + policy = build_policy(env, network, value_network='copy', **network_kwargs) + set_global_seeds(seed) + np.set_printoptions(precision=3) # Setup losses and stuff # ---------------------------------------- ob_space = env.observation_space ac_space = env.action_space - pi = policy_fn("pi", ob_space, ac_space) - oldpi = policy_fn("oldpi", ob_space, ac_space) + + ob = observation_placeholder(ob_space) + with tf.variable_scope("pi"): + pi = policy(observ_placeholder=ob) + with tf.variable_scope("oldpi"): + oldpi = policy(observ_placeholder=ob) + atarg = tf.placeholder(dtype=tf.float32, shape=[None]) # Target advantage function (if applicable) ret = tf.placeholder(dtype=tf.float32, shape=[None]) # Empirical return - ob = U.get_placeholder_cached(name="ob") ac = pi.pdtype.sample_placeholder([None]) kloldnew = oldpi.pd.kl(pi.pd) @@ -111,7 +179,7 @@ def learn(env, policy_fn, *, meanent = tf.reduce_mean(ent) entbonus = entcoeff * meanent - vferr = tf.reduce_mean(tf.square(pi.vpred - ret)) + vferr = tf.reduce_mean(tf.square(pi.vf - ret)) ratio = tf.exp(pi.pd.logp(ac) - oldpi.pd.logp(ac)) # advantage * pnew / pold surrgain = tf.reduce_mean(ratio * atarg) @@ -122,9 +190,12 @@ def learn(env, policy_fn, *, dist = meankl - all_var_list = pi.get_trainable_variables() - var_list = [v for v in all_var_list if v.name.split("/")[1].startswith("pol")] - vf_var_list = [v for v in all_var_list if v.name.split("/")[1].startswith("vf")] + all_var_list = get_trainable_variables("pi") + # var_list = [v for v in all_var_list if v.name.split("/")[1].startswith("pol")] + # vf_var_list = [v for v in all_var_list if v.name.split("/")[1].startswith("vf")] + var_list = get_pi_trainable_variables("pi") + vf_var_list = get_vf_trainable_variables("pi") + vfadam = MpiAdam(vf_var_list) get_flat = U.GetFlat(var_list) @@ -142,7 +213,8 @@ def learn(env, policy_fn, *, fvp = U.flatgrad(gvp, var_list) assign_old_eq_new = U.function([],[], updates=[tf.assign(oldv, newv) - for (oldv, newv) in zipsame(oldpi.get_variables(), pi.get_variables())]) + for (oldv, newv) in zipsame(get_variables("oldpi"), get_variables("pi"))]) + compute_losses = U.function([ob, ac, atarg], losses) compute_lossandgrad = U.function([ob, ac, atarg], losses + [U.flatgrad(optimgain, var_list)]) compute_fvp = U.function([flat_tangent, ob, ac, atarg], fvp) @@ -183,11 +255,11 @@ def learn(env, policy_fn, *, lenbuffer = deque(maxlen=40) # rolling buffer for episode lengths rewbuffer = deque(maxlen=40) # rolling buffer for episode rewards - assert sum([max_iters>0, max_timesteps>0, max_episodes>0])==1 + assert sum([max_iters>0, total_timesteps>0, max_episodes>0])==1 while True: if callback: callback(locals(), globals()) - if max_timesteps and timesteps_so_far >= max_timesteps: + if total_timesteps and timesteps_so_far >= total_timesteps: break elif max_episodes and episodes_so_far >= max_episodes: break @@ -287,5 +359,20 @@ def learn(env, policy_fn, *, if rank==0: logger.dump_tabular() + return pi + def flatten_lists(listoflists): - return [el for list_ in listoflists for el in list_] \ No newline at end of file + return [el for list_ in listoflists for el in list_] + +def get_variables(scope): + return tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope) + +def get_trainable_variables(scope): + return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope) + +def get_vf_trainable_variables(scope): + return [v for v in get_trainable_variables(scope) if 'vf' in v.name[len(scope):].split('/')] + +def get_pi_trainable_variables(scope): + return [v for v in get_trainable_variables(scope) if 'pi' in v.name[len(scope):].split('/')] + diff --git a/setup.py b/setup.py index bf8badc..35673ce 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,6 @@ setup(name='baselines', 'scipy', 'tqdm', 'joblib', - 'zmq', 'dill', 'progressbar2', 'mpi4py', @@ -23,6 +22,12 @@ setup(name='baselines', 'click', 'opencv-python' ], + extras_require={ + 'test': [ + 'filelock', + 'pytest' + ] + }, description='OpenAI baselines: high quality implementations of reinforcement learning algorithms', author='OpenAI', url='https://github.com/openai/baselines',