From 7edac38c73c8f4e76a8a481de55864594a330978 Mon Sep 17 00:00:00 2001 From: peter Date: Thu, 26 Jul 2018 14:26:57 -0700 Subject: [PATCH] more stuff from rl-algs --- Dockerfile | 2 +- baselines/a2c/runner.py | 60 +++ baselines/acer/acer.py | 379 ++++++++++++++++++ baselines/acktr/acktr.py | 1 + baselines/common/models.py | 177 ++++++++ baselines/common/mpi_adam_optimizer.py | 31 ++ baselines/common/mpi_util.py | 101 +++++ baselines/common/policies.py | 190 +++++++++ baselines/common/retro_wrappers.py | 293 ++++++++++++++ baselines/common/tests/__init__.py | 0 baselines/common/tests/envs/__init__.py | 0 .../common/tests/envs/fixed_sequence_env.py | 44 ++ baselines/common/tests/envs/identity_env.py | 70 ++++ baselines/common/tests/envs/mnist_env.py | 70 ++++ baselines/common/tests/test_cartpole.py | 45 +++ baselines/common/tests/test_fixed_sequence.py | 56 +++ baselines/common/tests/test_identity.py | 55 +++ baselines/common/tests/test_mnist.py | 50 +++ baselines/common/tests/test_serialization.py | 84 ++++ baselines/common/tests/util.py | 73 ++++ baselines/deepq/deepq.py | 320 +++++++++++++++ baselines/deepq/defaults.py | 21 + baselines/deepq/experiments/enjoy_retro.py | 34 ++ baselines/deepq/experiments/run_retro.py | 49 +++ baselines/run.py | 211 ++++++++++ conftest.py | 19 + 26 files changed, 2434 insertions(+), 1 deletion(-) create mode 100644 baselines/a2c/runner.py create mode 100644 baselines/acer/acer.py create mode 100644 baselines/acktr/acktr.py create mode 100644 baselines/common/models.py create mode 100644 baselines/common/mpi_adam_optimizer.py create mode 100644 baselines/common/mpi_util.py create mode 100644 baselines/common/policies.py create mode 100644 baselines/common/retro_wrappers.py create mode 100644 baselines/common/tests/__init__.py create mode 100644 baselines/common/tests/envs/__init__.py create mode 100644 baselines/common/tests/envs/fixed_sequence_env.py create mode 100644 baselines/common/tests/envs/identity_env.py create mode 100644 baselines/common/tests/envs/mnist_env.py create mode 100644 baselines/common/tests/test_cartpole.py create mode 100644 baselines/common/tests/test_fixed_sequence.py create mode 100644 baselines/common/tests/test_identity.py create mode 100644 baselines/common/tests/test_mnist.py create mode 100644 baselines/common/tests/test_serialization.py create mode 100644 baselines/common/tests/util.py create mode 100644 baselines/deepq/deepq.py create mode 100644 baselines/deepq/defaults.py create mode 100644 baselines/deepq/experiments/enjoy_retro.py create mode 100644 baselines/deepq/experiments/run_retro.py create mode 100644 baselines/run.py create mode 100644 conftest.py diff --git a/Dockerfile b/Dockerfile index c16646b..1d432f3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM ubuntu:16.04 -RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake +RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv ENV CODE_DIR /root/code ENV VENV /root/venv diff --git a/baselines/a2c/runner.py b/baselines/a2c/runner.py new file mode 100644 index 0000000..60b5e1d --- /dev/null +++ b/baselines/a2c/runner.py @@ -0,0 +1,60 @@ +import numpy as np +from baselines.a2c.utils import discount_with_dones +from baselines.common.runners import AbstractEnvRunner + +class Runner(AbstractEnvRunner): + + def __init__(self, env, model, nsteps=5, gamma=0.99): + super().__init__(env=env, model=model, nsteps=nsteps) + self.gamma = gamma + self.batch_action_shape = [x if x is not None else -1 for x in model.train_model.action.shape.as_list()] + self.ob_dtype = model.train_model.X.dtype.as_numpy_dtype + + def run(self): + mb_obs, mb_rewards, mb_actions, mb_values, mb_dones = [],[],[],[],[] + mb_states = self.states + for n in range(self.nsteps): + actions, values, states, _ = self.model.step(self.obs, S=self.states, M=self.dones) + mb_obs.append(np.copy(self.obs)) + mb_actions.append(actions) + mb_values.append(values) + mb_dones.append(self.dones) + obs, rewards, dones, _ = self.env.step(actions) + self.states = states + self.dones = dones + for n, done in enumerate(dones): + if done: + self.obs[n] = self.obs[n]*0 + self.obs = obs + mb_rewards.append(rewards) + mb_dones.append(self.dones) + #batch of steps to batch of rollouts + + mb_obs = np.asarray(mb_obs, dtype=self.ob_dtype).swapaxes(1, 0).reshape(self.batch_ob_shape) + mb_rewards = np.asarray(mb_rewards, dtype=np.float32).swapaxes(1, 0) + mb_actions = np.asarray(mb_actions, dtype=self.model.train_model.action.dtype.name).swapaxes(1, 0) + mb_values = np.asarray(mb_values, dtype=np.float32).swapaxes(1, 0) + mb_dones = np.asarray(mb_dones, dtype=np.bool).swapaxes(1, 0) + mb_masks = mb_dones[:, :-1] + mb_dones = mb_dones[:, 1:] + + + if self.gamma > 0.0: + #discount/bootstrap off value fn + last_values = self.model.value(self.obs, S=self.states, M=self.dones).tolist() + for n, (rewards, dones, value) in enumerate(zip(mb_rewards, mb_dones, last_values)): + rewards = rewards.tolist() + dones = dones.tolist() + if dones[-1] == 0: + rewards = discount_with_dones(rewards+[value], dones+[0], self.gamma)[:-1] + else: + rewards = discount_with_dones(rewards, dones, self.gamma) + + mb_rewards[n] = rewards + + mb_actions = mb_actions.reshape(self.batch_action_shape) + + mb_rewards = mb_rewards.flatten() + mb_values = mb_values.flatten() + mb_masks = mb_masks.flatten() + return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values diff --git a/baselines/acer/acer.py b/baselines/acer/acer.py new file mode 100644 index 0000000..170d18b --- /dev/null +++ b/baselines/acer/acer.py @@ -0,0 +1,379 @@ +import time +import joblib +import numpy as np +import tensorflow as tf +from baselines import logger + +from baselines.common import set_global_seeds +from baselines.common.policies import build_policy +from baselines.common.tf_util import get_session + +from baselines.a2c.utils import batch_to_seq, seq_to_batch +from baselines.a2c.utils import cat_entropy_softmax +from baselines.a2c.utils import Scheduler, make_path, find_trainable_variables +from baselines.a2c.utils import EpisodeStats +from baselines.a2c.utils import get_by_index, check_shape, avg_norm, gradient_add, q_explained_variance +from baselines.acer.buffer import Buffer +from baselines.acer.runner import Runner + +import os.path as osp + +# remove last step +def strip(var, nenvs, nsteps, flat = False): + vars = batch_to_seq(var, nenvs, nsteps + 1, flat) + return seq_to_batch(vars[:-1], flat) + +def q_retrace(R, D, q_i, v, rho_i, nenvs, nsteps, gamma): + """ + Calculates q_retrace targets + + :param R: Rewards + :param D: Dones + :param q_i: Q values for actions taken + :param v: V values + :param rho_i: Importance weight for each action + :return: Q_retrace values + """ + rho_bar = batch_to_seq(tf.minimum(1.0, rho_i), nenvs, nsteps, True) # list of len steps, shape [nenvs] + rs = batch_to_seq(R, nenvs, nsteps, True) # list of len steps, shape [nenvs] + ds = batch_to_seq(D, nenvs, nsteps, True) # list of len steps, shape [nenvs] + q_is = batch_to_seq(q_i, nenvs, nsteps, True) + vs = batch_to_seq(v, nenvs, nsteps + 1, True) + v_final = vs[-1] + qret = v_final + qrets = [] + for i in range(nsteps - 1, -1, -1): + check_shape([qret, ds[i], rs[i], rho_bar[i], q_is[i], vs[i]], [[nenvs]] * 6) + qret = rs[i] + gamma * qret * (1.0 - ds[i]) + qrets.append(qret) + qret = (rho_bar[i] * (qret - q_is[i])) + vs[i] + qrets = qrets[::-1] + qret = seq_to_batch(qrets, flat=True) + return qret + +# For ACER with PPO clipping instead of trust region +# def clip(ratio, eps_clip): +# # assume 0 <= eps_clip <= 1 +# return tf.minimum(1 + eps_clip, tf.maximum(1 - eps_clip, ratio)) + +class Model(object): + def __init__(self, policy, ob_space, ac_space, nenvs, nsteps, nstack, num_procs, + ent_coef, q_coef, gamma, max_grad_norm, lr, + rprop_alpha, rprop_epsilon, total_timesteps, lrschedule, + c, trust_region, alpha, delta): + + sess = get_session() + nact = ac_space.n + nbatch = nenvs * nsteps + + A = tf.placeholder(tf.int32, [nbatch]) # actions + D = tf.placeholder(tf.float32, [nbatch]) # dones + R = tf.placeholder(tf.float32, [nbatch]) # rewards, not returns + MU = tf.placeholder(tf.float32, [nbatch, nact]) # mu's + LR = tf.placeholder(tf.float32, []) + eps = 1e-6 + + step_ob_placeholder = tf.placeholder(dtype=ob_space.dtype, shape=(nenvs,) + ob_space.shape[:-1] + (ob_space.shape[-1] * nstack,)) + train_ob_placeholder = tf.placeholder(dtype=ob_space.dtype, shape=(nenvs*(nsteps+1),) + ob_space.shape[:-1] + (ob_space.shape[-1] * nstack,)) + with tf.variable_scope('acer_model', reuse=tf.AUTO_REUSE): + + step_model = policy(observ_placeholder=step_ob_placeholder, sess=sess) + train_model = policy(observ_placeholder=train_ob_placeholder, sess=sess) + + + params = find_trainable_variables("acer_model") + print("Params {}".format(len(params))) + for var in params: + print(var) + + # create polyak averaged model + ema = tf.train.ExponentialMovingAverage(alpha) + ema_apply_op = ema.apply(params) + + def custom_getter(getter, *args, **kwargs): + v = ema.average(getter(*args, **kwargs)) + print(v.name) + return v + + with tf.variable_scope("acer_model", custom_getter=custom_getter, reuse=True): + polyak_model = policy(observ_placeholder=train_ob_placeholder, sess=sess) + + # Notation: (var) = batch variable, (var)s = seqeuence variable, (var)_i = variable index by action at step i + + # action probability distributions according to train_model, polyak_model and step_model + # poilcy.pi is probability distribution parameters; to obtain distribution that sums to 1 need to take softmax + train_model_p = tf.nn.softmax(train_model.pi) + polyak_model_p = tf.nn.softmax(polyak_model.pi) + step_model_p = tf.nn.softmax(step_model.pi) + v = tf.reduce_sum(train_model_p * train_model.q, axis = -1) # shape is [nenvs * (nsteps + 1)] + + # strip off last step + f, f_pol, q = map(lambda var: strip(var, nenvs, nsteps), [train_model_p, polyak_model_p, train_model.q]) + # Get pi and q values for actions taken + f_i = get_by_index(f, A) + q_i = get_by_index(q, A) + + # Compute ratios for importance truncation + rho = f / (MU + eps) + rho_i = get_by_index(rho, A) + + # Calculate Q_retrace targets + qret = q_retrace(R, D, q_i, v, rho_i, nenvs, nsteps, gamma) + + # Calculate losses + # Entropy + # entropy = tf.reduce_mean(strip(train_model.pd.entropy(), nenvs, nsteps)) + entropy = tf.reduce_mean(cat_entropy_softmax(f)) + + # Policy Graident loss, with truncated importance sampling & bias correction + v = strip(v, nenvs, nsteps, True) + check_shape([qret, v, rho_i, f_i], [[nenvs * nsteps]] * 4) + check_shape([rho, f, q], [[nenvs * nsteps, nact]] * 2) + + # Truncated importance sampling + adv = qret - v + logf = tf.log(f_i + eps) + gain_f = logf * tf.stop_gradient(adv * tf.minimum(c, rho_i)) # [nenvs * nsteps] + loss_f = -tf.reduce_mean(gain_f) + + # Bias correction for the truncation + adv_bc = (q - tf.reshape(v, [nenvs * nsteps, 1])) # [nenvs * nsteps, nact] + logf_bc = tf.log(f + eps) # / (f_old + eps) + check_shape([adv_bc, logf_bc], [[nenvs * nsteps, nact]]*2) + gain_bc = tf.reduce_sum(logf_bc * tf.stop_gradient(adv_bc * tf.nn.relu(1.0 - (c / (rho + eps))) * f), axis = 1) #IMP: This is sum, as expectation wrt f + loss_bc= -tf.reduce_mean(gain_bc) + + loss_policy = loss_f + loss_bc + + # Value/Q function loss, and explained variance + check_shape([qret, q_i], [[nenvs * nsteps]]*2) + ev = q_explained_variance(tf.reshape(q_i, [nenvs, nsteps]), tf.reshape(qret, [nenvs, nsteps])) + loss_q = tf.reduce_mean(tf.square(tf.stop_gradient(qret) - q_i)*0.5) + + # Net loss + check_shape([loss_policy, loss_q, entropy], [[]] * 3) + loss = loss_policy + q_coef * loss_q - ent_coef * entropy + + if trust_region: + g = tf.gradients(- (loss_policy - ent_coef * entropy) * nsteps * nenvs, f) #[nenvs * nsteps, nact] + # k = tf.gradients(KL(f_pol || f), f) + k = - f_pol / (f + eps) #[nenvs * nsteps, nact] # Directly computed gradient of KL divergence wrt f + k_dot_g = tf.reduce_sum(k * g, axis=-1) + adj = tf.maximum(0.0, (tf.reduce_sum(k * g, axis=-1) - delta) / (tf.reduce_sum(tf.square(k), axis=-1) + eps)) #[nenvs * nsteps] + + # Calculate stats (before doing adjustment) for logging. + avg_norm_k = avg_norm(k) + avg_norm_g = avg_norm(g) + avg_norm_k_dot_g = tf.reduce_mean(tf.abs(k_dot_g)) + avg_norm_adj = tf.reduce_mean(tf.abs(adj)) + + g = g - tf.reshape(adj, [nenvs * nsteps, 1]) * k + grads_f = -g/(nenvs*nsteps) # These are turst region adjusted gradients wrt f ie statistics of policy pi + grads_policy = tf.gradients(f, params, grads_f) + grads_q = tf.gradients(loss_q * q_coef, params) + grads = [gradient_add(g1, g2, param) for (g1, g2, param) in zip(grads_policy, grads_q, params)] + + avg_norm_grads_f = avg_norm(grads_f) * (nsteps * nenvs) + norm_grads_q = tf.global_norm(grads_q) + norm_grads_policy = tf.global_norm(grads_policy) + else: + grads = tf.gradients(loss, params) + + if max_grad_norm is not None: + grads, norm_grads = tf.clip_by_global_norm(grads, max_grad_norm) + grads = list(zip(grads, params)) + trainer = tf.train.RMSPropOptimizer(learning_rate=LR, decay=rprop_alpha, epsilon=rprop_epsilon) + _opt_op = trainer.apply_gradients(grads) + + # so when you call _train, you first do the gradient step, then you apply ema + with tf.control_dependencies([_opt_op]): + _train = tf.group(ema_apply_op) + + lr = Scheduler(v=lr, nvalues=total_timesteps, schedule=lrschedule) + + # Ops/Summaries to run, and their names for logging + run_ops = [_train, loss, loss_q, entropy, loss_policy, loss_f, loss_bc, ev, norm_grads] + names_ops = ['loss', 'loss_q', 'entropy', 'loss_policy', 'loss_f', 'loss_bc', 'explained_variance', + 'norm_grads'] + if trust_region: + run_ops = run_ops + [norm_grads_q, norm_grads_policy, avg_norm_grads_f, avg_norm_k, avg_norm_g, avg_norm_k_dot_g, + avg_norm_adj] + names_ops = names_ops + ['norm_grads_q', 'norm_grads_policy', 'avg_norm_grads_f', 'avg_norm_k', 'avg_norm_g', + 'avg_norm_k_dot_g', 'avg_norm_adj'] + + def train(obs, actions, rewards, dones, mus, states, masks, steps): + cur_lr = lr.value_steps(steps) + td_map = {train_model.X: obs, polyak_model.X: obs, A: actions, R: rewards, D: dones, MU: mus, LR: cur_lr} + if states is not None: + td_map[train_model.S] = states + td_map[train_model.M] = masks + td_map[polyak_model.S] = states + td_map[polyak_model.M] = masks + + return names_ops, sess.run(run_ops, td_map)[1:] # strip off _train + + def save(save_path): + ps = sess.run(params) + make_path(osp.dirname(save_path)) + joblib.dump(ps, save_path) + + def _step(observation, **kwargs): + return step_model._evaluate([step_model.action, step_model_p, step_model.state], observation, **kwargs) + + + + self.train = train + self.save = save + self.train_model = train_model + self.step_model = step_model + self._step = _step + self.step = self.step_model.step + + self.initial_state = step_model.initial_state + tf.global_variables_initializer().run(session=sess) + + +class Acer(): + def __init__(self, runner, model, buffer, log_interval): + self.runner = runner + self.model = model + self.buffer = buffer + self.log_interval = log_interval + self.tstart = None + self.episode_stats = EpisodeStats(runner.nsteps, runner.nenv) + self.steps = None + + def call(self, on_policy): + runner, model, buffer, steps = self.runner, self.model, self.buffer, self.steps + if on_policy: + enc_obs, obs, actions, rewards, mus, dones, masks = runner.run() + self.episode_stats.feed(rewards, dones) + if buffer is not None: + buffer.put(enc_obs, actions, rewards, mus, dones, masks) + else: + # get obs, actions, rewards, mus, dones from buffer. + obs, actions, rewards, mus, dones, masks = buffer.get() + + # reshape stuff correctly + obs = obs.reshape(runner.batch_ob_shape) + actions = actions.reshape([runner.nbatch]) + rewards = rewards.reshape([runner.nbatch]) + mus = mus.reshape([runner.nbatch, runner.nact]) + dones = dones.reshape([runner.nbatch]) + masks = masks.reshape([runner.batch_ob_shape[0]]) + + names_ops, values_ops = model.train(obs, actions, rewards, dones, mus, model.initial_state, masks, steps) + + if on_policy and (int(steps/runner.nbatch) % self.log_interval == 0): + logger.record_tabular("total_timesteps", steps) + logger.record_tabular("fps", int(steps/(time.time() - self.tstart))) + # IMP: In EpisodicLife env, during training, we get done=True at each loss of life, not just at the terminal state. + # Thus, this is mean until end of life, not end of episode. + # For true episode rewards, see the monitor files in the log folder. + logger.record_tabular("mean_episode_length", self.episode_stats.mean_length()) + logger.record_tabular("mean_episode_reward", self.episode_stats.mean_reward()) + for name, val in zip(names_ops, values_ops): + logger.record_tabular(name, float(val)) + logger.dump_tabular() + + +def learn(network, env, seed=None, nsteps=20, nstack=4, total_timesteps=int(80e6), q_coef=0.5, ent_coef=0.01, + max_grad_norm=10, lr=7e-4, lrschedule='linear', rprop_epsilon=1e-5, rprop_alpha=0.99, gamma=0.99, + log_interval=100, buffer_size=50000, replay_ratio=4, replay_start=10000, c=10.0, + trust_region=True, alpha=0.99, delta=1, **network_kwargs): + + ''' + Main entrypoint for ACER (Actor-Critic with Experience Replay) algorithm (https://arxiv.org/pdf/1611.01224.pdf) + Train an agent with given network architecture on a given environment using ACER. + + Parameters: + ---------- + + network: policy network architecture. Either string (mlp, lstm, lnlstm, cnn_lstm, cnn, cnn_small, conv_only - see baselines.common/models.py for full list) + specifying the standard network architecture, or a function that takes tensorflow tensor as input and returns + tuple (output_tensor, extra_feed) where output tensor is the last network layer output, extra_feed is None for feed-forward + neural nets, and extra_feed is a dictionary describing how to feed state into the network for recurrent neural nets. + See baselines.common/policies.py/lstm for more details on using recurrent nets in policies + + env: environment. Needs to be vectorized for parallel environment simulation. + The environments produced by gym.make can be wrapped using baselines.common.vec_env.DummyVecEnv class. + + nsteps: int, number of steps of the vectorized environment per update (i.e. batch size is nsteps * nenv where + nenv is number of environment copies simulated in parallel) (default: 20) + + nstack: int, size of the frame stack, i.e. number of the frames passed to the step model. Frames are stacked along channel dimension + (last image dimension) (default: 4) + + total_timesteps: int, number of timesteps (i.e. number of actions taken in the environment) (default: 80M) + + q_coef: float, value function loss coefficient in the optimization objective (analog of vf_coef for other actor-critic methods) + + ent_coef: float, policy entropy coefficient in the optimization objective (default: 0.01) + + max_grad_norm: float, gradient norm clipping coefficient. If set to None, no clipping. (default: 10), + + lr: float, learning rate for RMSProp (current implementation has RMSProp hardcoded in) (default: 7e-4) + + lrschedule: schedule of learning rate. Can be 'linear', 'constant', or a function [0..1] -> [0..1] that takes fraction of the training progress as input and + returns fraction of the learning rate (specified as lr) as output + + rprop_epsilon: float, RMSProp epsilon (stabilizes square root computation in denominator of RMSProp update) (default: 1e-5) + + rprop_alpha: float, RMSProp decay parameter (default: 0.99) + + gamma: float, reward discounting factor (default: 0.99) + + log_interval: int, number of updates between logging events (default: 100) + + buffer_size: int, size of the replay buffer (default: 50k) + + replay_ratio: int, now many (on average) batches of data to sample from the replay buffer take after batch from the environment (default: 4) + + replay_start: int, the sampling from the replay buffer does not start until replay buffer has at least that many samples (default: 10k) + + c: float, importance weight clipping factor (default: 10) + + trust_region bool, whether or not algorithms estimates the gradient KL divergence between the old and updated policy and uses it to determine step size (default: True) + + delta: float, max KL divergence between the old policy and updated policy (default: 1) + + alpha: float, momentum factor in the Polyak (exponential moving average) averaging of the model parameters (default: 0.99) + + **network_kwargs: keyword arguments to the policy / network builder. See baselines.common/policies.py/build_policy and arguments to a particular type of network + For instance, 'mlp' network architecture has arguments num_hidden and num_layers. + + ''' + + print("Running Acer Simple") + print(locals()) + set_global_seeds(seed) + policy = build_policy(env, network, estimate_q=True, **network_kwargs) + + nenvs = env.num_envs + ob_space = env.observation_space + ac_space = env.action_space + num_procs = len(env.remotes) if hasattr(env, 'remotes') else 1# HACK + model = Model(policy=policy, ob_space=ob_space, ac_space=ac_space, nenvs=nenvs, nsteps=nsteps, nstack=nstack, + num_procs=num_procs, ent_coef=ent_coef, q_coef=q_coef, gamma=gamma, + max_grad_norm=max_grad_norm, lr=lr, rprop_alpha=rprop_alpha, rprop_epsilon=rprop_epsilon, + total_timesteps=total_timesteps, lrschedule=lrschedule, c=c, + trust_region=trust_region, alpha=alpha, delta=delta) + + runner = Runner(env=env, model=model, nsteps=nsteps, nstack=nstack) + if replay_ratio > 0: + buffer = Buffer(env=env, nsteps=nsteps, nstack=nstack, size=buffer_size) + else: + buffer = None + nbatch = nenvs*nsteps + acer = Acer(runner, model, buffer, log_interval) + acer.tstart = time.time() + + for acer.steps in range(0, total_timesteps, nbatch): #nbatch samples, 1 on_policy call and multiple off-policy calls + acer.call(on_policy=True) + if replay_ratio > 0 and buffer.has_atleast(replay_start): + n = np.random.poisson(replay_ratio) + for _ in range(n): + acer.call(on_policy=False) # no simulation steps in this + + env.close() + return model diff --git a/baselines/acktr/acktr.py b/baselines/acktr/acktr.py new file mode 100644 index 0000000..97090b4 --- /dev/null +++ b/baselines/acktr/acktr.py @@ -0,0 +1 @@ +from baselines.acktr.acktr_disc import * diff --git a/baselines/common/models.py b/baselines/common/models.py new file mode 100644 index 0000000..0763095 --- /dev/null +++ b/baselines/common/models.py @@ -0,0 +1,177 @@ +import numpy as np +import tensorflow as tf +from baselines.a2c import utils +from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch +from baselines.common.mpi_running_mean_std import RunningMeanStd +import tensorflow.contrib.layers as layers + + +def nature_cnn(unscaled_images, **conv_kwargs): + """ + CNN from Nature paper. + """ + scaled_images = tf.cast(unscaled_images, tf.float32) / 255. + activ = tf.nn.relu + h = activ(conv(scaled_images, 'c1', nf=32, rf=8, stride=4, init_scale=np.sqrt(2), + **conv_kwargs)) + h2 = activ(conv(h, 'c2', nf=64, rf=4, stride=2, init_scale=np.sqrt(2), **conv_kwargs)) + h3 = activ(conv(h2, 'c3', nf=64, rf=3, stride=1, init_scale=np.sqrt(2), **conv_kwargs)) + h3 = conv_to_fc(h3) + return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2))) + + +def mlp(num_layers=2, num_hidden=64, activation=tf.tanh): + """ + Simple fully connected layer policy. Separate stacks of fully-connected layers are used for policy and value function estimation. + More customized fully-connected policies can be obtained by using PolicyWithV class directly. + + Parameters: + ---------- + + num_layers: int number of fully-connected layers (default: 2) + + num_hidden: int size of fully-connected layers (default: 64) + + activation: activation function (default: tf.tanh) + + Returns: + ------- + + function that builds fully connected network with a given input placeholder + """ + def network_fn(X): + h = tf.layers.flatten(X) + for i in range(num_layers): + h = activation(fc(h, 'mlp_fc{}'.format(i), nh=num_hidden, init_scale=np.sqrt(2))) + return h, None + + return network_fn + + +def cnn(**conv_kwargs): + def network_fn(X): + return nature_cnn(X, **conv_kwargs), None + return network_fn + +def cnn_small(**conv_kwargs): + def network_fn(X): + h = tf.cast(X, tf.float32) / 255. + + activ = tf.nn.relu + h = activ(conv(h, 'c1', nf=8, rf=8, stride=4, init_scale=np.sqrt(2), **conv_kwargs)) + h = activ(conv(h, 'c2', nf=16, rf=4, stride=2, init_scale=np.sqrt(2), **conv_kwargs)) + h = conv_to_fc(h) + h = activ(fc(h, 'fc1', nh=128, init_scale=np.sqrt(2))) + return h, None + return network_fn + + + +def lstm(nlstm=128, layer_norm=False): + def network_fn(X, nenv=1): + nbatch = X.shape[0] + nsteps = nbatch // nenv + + h = tf.layers.flatten(X) + + M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) + S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states + + xs = batch_to_seq(h, nenv, nsteps) + ms = batch_to_seq(M, nenv, nsteps) + + if layer_norm: + h5, snew = utils.lnlstm(xs, ms, S, scope='lnlstm', nh=nlstm) + else: + h5, snew = utils.lstm(xs, ms, S, scope='lstm', nh=nlstm) + + h = seq_to_batch(h5) + initial_state = np.zeros(S.shape.as_list(), dtype=float) + + return h, {'S':S, 'M':M, 'state':snew, 'initial_state':initial_state} + + return network_fn + + +def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs): + def network_fn(X, nenv=1): + nbatch = X.shape[0] + nsteps = nbatch // nenv + + h = nature_cnn(X, **conv_kwargs) + + M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1) + S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states + + xs = batch_to_seq(h, nenv, nsteps) + ms = batch_to_seq(M, nenv, nsteps) + + if layer_norm: + h5, snew = utils.lnlstm(xs, ms, S, scope='lnlstm', nh=nlstm) + else: + h5, snew = utils.lstm(xs, ms, S, scope='lstm', nh=nlstm) + + h = seq_to_batch(h5) + initial_state = np.zeros(S.shape.as_list(), dtype=float) + + return h, {'S':S, 'M':M, 'state':snew, 'initial_state':initial_state} + + return network_fn + +def cnn_lnlstm(nlstm=128, **conv_kwargs): + return cnn_lstm(nlstm, layer_norm=True, **conv_kwargs) + + +def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs): + ''' + convolutions-only net + + Parameters: + ---------- + + conv: list of triples (filter_number, filter_size, stride) specifying parameters for each layer. + + Returns: + + function that takes tensorflow tensor as input and returns the output of the last convolutional layer + + ''' + + def network_fn(X): + out = X + with tf.variable_scope("convnet"): + for num_outputs, kernel_size, stride in convs: + out = layers.convolution2d(out, + num_outputs=num_outputs, + kernel_size=kernel_size, + stride=stride, + activation_fn=tf.nn.relu, + **conv_kwargs) + + return out, None + return network_fn + +def _normalize_clip_observation(x, clip_range=[-5.0, 5.0]): + rms = RunningMeanStd(shape=x.shape[1:]) + norm_x = tf.clip_by_value((x - rms.mean) / rms.std, min(clip_range), max(clip_range)) + return norm_x, rms + + +def get_network_builder(name): + # TODO: replace with reflection? + if name == 'cnn': + return cnn + elif name == 'cnn_small': + return cnn_small + elif name == 'conv_only': + return conv_only + elif name == 'mlp': + return mlp + elif name == 'lstm': + return lstm + elif name == 'cnn_lstm': + return cnn_lstm + elif name == 'cnn_lnlstm': + return cnn_lnlstm + else: + raise ValueError('Unknown network type: {}'.format(name)) diff --git a/baselines/common/mpi_adam_optimizer.py b/baselines/common/mpi_adam_optimizer.py new file mode 100644 index 0000000..8cf09c4 --- /dev/null +++ b/baselines/common/mpi_adam_optimizer.py @@ -0,0 +1,31 @@ +import numpy as np +import tensorflow as tf +from mpi4py import MPI + +class MpiAdamOptimizer(tf.train.AdamOptimizer): + """Adam optimizer that averages gradients across mpi processes.""" + def __init__(self, comm, **kwargs): + self.comm = comm + tf.train.AdamOptimizer.__init__(self, **kwargs) + def compute_gradients(self, loss, var_list, **kwargs): + grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs) + grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None] + flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) + shapes = [v.shape.as_list() for g, v in grads_and_vars] + sizes = [int(np.prod(s)) for s in shapes] + + num_tasks = self.comm.Get_size() + buf = np.zeros(sum(sizes), np.float32) + + def _collect_grads(flat_grad): + self.comm.Allreduce(flat_grad, buf, op=MPI.SUM) + np.divide(buf, float(num_tasks), out=buf) + return buf + + avg_flat_grad = tf.py_func(_collect_grads, [flat_grad], tf.float32) + avg_flat_grad.set_shape(flat_grad.shape) + avg_grads = tf.split(avg_flat_grad, sizes, axis=0) + avg_grads_and_vars = [(tf.reshape(g, v.shape), v) + for g, (_, v) in zip(avg_grads, grads_and_vars)] + + return avg_grads_and_vars diff --git a/baselines/common/mpi_util.py b/baselines/common/mpi_util.py new file mode 100644 index 0000000..f04187b --- /dev/null +++ b/baselines/common/mpi_util.py @@ -0,0 +1,101 @@ +from collections import defaultdict +from mpi4py import MPI +import os, numpy as np +import platform +import shutil +import subprocess + +def sync_from_root(sess, variables, comm=None): + """ + Send the root node's parameters to every worker. + Arguments: + sess: the TensorFlow session. + variables: all parameter variables including optimizer's + """ + if comm is None: comm = MPI.COMM_WORLD + rank = comm.Get_rank() + for var in variables: + if rank == 0: + comm.Bcast(sess.run(var)) + else: + import tensorflow as tf + returned_var = np.empty(var.shape, dtype='float32') + comm.Bcast(returned_var) + sess.run(tf.assign(var, returned_var)) + +def gpu_count(): + """ + Count the GPUs on this machine. + """ + if shutil.which('nvidia-smi') is None: + return 0 + output = subprocess.check_output(['nvidia-smi', '--query-gpu=gpu_name', '--format=csv']) + return max(0, len(output.split(b'\n')) - 2) + +def setup_mpi_gpus(): + """ + Set CUDA_VISIBLE_DEVICES using MPI. + """ + num_gpus = gpu_count() + if num_gpus == 0: + return + local_rank, _ = get_local_rank_size(MPI.COMM_WORLD) + os.environ['CUDA_VISIBLE_DEVICES'] = str(local_rank % num_gpus) + +def get_local_rank_size(comm): + """ + Returns the rank of each process on its machine + The processes on a given machine will be assigned ranks + 0, 1, 2, ..., N-1, + where N is the number of processes on this machine. + + Useful if you want to assign one gpu per machine + """ + this_node = platform.node() + ranks_nodes = comm.allgather((comm.Get_rank(), this_node)) + node2rankssofar = defaultdict(int) + local_rank = None + for (rank, node) in ranks_nodes: + if rank == comm.Get_rank(): + local_rank = node2rankssofar[node] + node2rankssofar[node] += 1 + assert local_rank is not None + return local_rank, node2rankssofar[this_node] + +def share_file(comm, path): + """ + Copies the file from rank 0 to all other ranks + Puts it in the same place on all machines + """ + localrank, _ = get_local_rank_size(comm) + if comm.Get_rank() == 0: + with open(path, 'rb') as fh: + data = fh.read() + comm.bcast(data) + else: + data = comm.bcast(None) + if localrank == 0: + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as fh: + fh.write(data) + comm.Barrier() + +def dict_gather(comm, d, op='mean', assert_all_have_data=True): + if comm is None: return d + alldicts = comm.allgather(d) + size = comm.size + k2li = defaultdict(list) + for d in alldicts: + for (k,v) in d.items(): + k2li[k].append(v) + result = {} + for (k,li) in k2li.items(): + if assert_all_have_data: + assert len(li)==size, "only %i out of %i MPI workers have sent '%s'" % (len(li), size, k) + if op=='mean': + result[k] = np.mean(li, axis=0) + elif op=='sum': + result[k] = np.sum(li, axis=0) + else: + assert 0, op + return result diff --git a/baselines/common/policies.py b/baselines/common/policies.py new file mode 100644 index 0000000..f6f411e --- /dev/null +++ b/baselines/common/policies.py @@ -0,0 +1,190 @@ +import joblib +import os.path as osp +import tensorflow as tf +from baselines.a2c.utils import make_path, fc +from baselines.common.distributions import make_pdtype +from baselines.common.input import observation_placeholder, encode_observation +from baselines.common.tf_util import adjust_shape +from baselines.common.mpi_running_mean_std import RunningMeanStd +from baselines.common.models import get_network_builder + +import gym + + +class PolicyWithValue(object): + """ + Encapsulates fields and methods for RL policy and value function estimation with shared parameters + """ + + def __init__(self, env, observations, latent, estimate_q=False, vf_latent=None, sess=None, **tensors): + """ + Parameters: + ---------- + env RL environment + + observations tensorflow placeholder in which the observations will be fed + + latent latent state from which policy distribution parameters should be inferred + + vf_latent latent state from which value function should be inferred (if None, then latent is used) + + sess tensorflow session to run calculations in (if None, default session is used) + + **tensors tensorflow tensors for additional attributes such as state or mask + + """ + + self.X = observations + self.state = tf.constant([]) + self.initial_state = None + self.__dict__.update(tensors) + + vf_latent = vf_latent if vf_latent is not None else latent + + vf_latent = tf.layers.flatten(vf_latent) + latent = tf.layers.flatten(latent) + + self.pdtype = make_pdtype(env.action_space) + + self.pd, self.pi = self.pdtype.pdfromlatent(latent, init_scale=0.01) + + self.action = self.pd.sample() + self.neglogp = self.pd.neglogp(self.action) + self.sess = sess + + if estimate_q: + assert isinstance(env.action_space, gym.spaces.Discrete) + self.q = fc(vf_latent, 'q', env.action_space.n) + self.vf = self.q + else: + self.vf = fc(vf_latent, 'vf', 1) + self.vf = self.vf[:,0] + + def _evaluate(self, variables, observation, **extra_feed): + sess = self.sess or tf.get_default_session() + feed_dict = {self.X: adjust_shape(self.X, observation)} + for inpt_name, data in extra_feed.items(): + if inpt_name in self.__dict__.keys(): + inpt = self.__dict__[inpt_name] + if isinstance(inpt, tf.Tensor) and inpt._op.type == 'Placeholder': + feed_dict[inpt] = adjust_shape(inpt, data) + + return sess.run(variables, feed_dict) + + def step(self, observation, **extra_feed): + """ + Compute next action(s) given the observaion(s) + + Parameters: + ---------- + + observation observation data (either single or a batch) + + **extra_feed additional data such as state or mask (names of the arguments should match the ones in constructor, see __init__) + + Returns: + ------- + (action, value estimate, next state, negative log likelihood of the action under current policy parameters) tuple + """ + + a, v, state, neglogp = self._evaluate([self.action, self.vf, self.state, self.neglogp], observation, **extra_feed) + if state.size == 0: + state = None + return a, v, state, neglogp + + def value(self, ob, *args, **kwargs): + """ + Compute value estimate(s) given the observaion(s) + + Parameters: + ---------- + + observation observation data (either single or a batch) + + **extra_feed additional data such as state or mask (names of the arguments should match the ones in constructor, see __init__) + + Returns: + ------- + value estimate + """ + return self._evaluate(self.vf, ob, *args, **kwargs) + + def save(self, save_path): + sess = self.sess or tf.get_default_session() + params = tf.trainable_variables() + ps = sess.run(params) + make_path(osp.dirname(save_path)) + joblib.dump(ps, save_path) + + def load(self, load_path): + sess = self.sess or tf.get_default_session() + params = tf.trainable_variables() + loaded_params = joblib.load(load_path) + restores = [] + for p, loaded_p in zip(params, loaded_params): + restores.append(p.assign(loaded_p)) + sess.run(restores) + +def build_policy(env, policy_network, value_network=None, normalize_observations=False, estimate_q=False, **policy_kwargs): + if isinstance(policy_network, str): + network_type = policy_network + policy_network = get_network_builder(network_type)(**policy_kwargs) + + def policy_fn(nbatch=None, nsteps=None, sess=None, observ_placeholder=None): + ob_space = env.observation_space + + X = observ_placeholder if observ_placeholder is not None else observation_placeholder(ob_space, batch_size=nbatch) + + extra_tensors = {} + + if normalize_observations and X.dtype == tf.float32: + encoded_x, rms = _normalize_clip_observation(X) + extra_tensors['rms'] = rms + else: + encoded_x = X + + encoded_x = encode_observation(ob_space, encoded_x) + + with tf.variable_scope('pi', reuse=tf.AUTO_REUSE): + policy_latent, recurrent_tensors = policy_network(encoded_x) + + if recurrent_tensors is not None: + # recurrent architecture, need a few more steps + nenv = nbatch // nsteps + assert nenv > 0, 'Bad input for recurrent policy: batch size {} smaller than nsteps {}'.format(nbatch, nsteps) + policy_latent, recurrent_tensors = policy_network(encoded_x, nenv) + extra_tensors.update(recurrent_tensors) + + + _v_net = value_network + + if _v_net is None or _v_net == 'shared': + vf_latent = policy_latent + else: + if _v_net == 'copy': + _v_net = policy_network + else: + assert callable(_v_net) + + with tf.variable_scope('vf', reuse=tf.AUTO_REUSE): + vf_latent, _ = _v_net(encoded_x) + + policy = PolicyWithValue( + env=env, + observations=X, + latent=policy_latent, + vf_latent=vf_latent, + sess=sess, + estimate_q=estimate_q, + **extra_tensors + ) + return policy + + return policy_fn + + +def _normalize_clip_observation(x, clip_range=[-5.0, 5.0]): + rms = RunningMeanStd(shape=x.shape[1:]) + norm_x = tf.clip_by_value((x - rms.mean) / rms.std, min(clip_range), max(clip_range)) + return norm_x, rms + diff --git a/baselines/common/retro_wrappers.py b/baselines/common/retro_wrappers.py new file mode 100644 index 0000000..3eb2eb3 --- /dev/null +++ b/baselines/common/retro_wrappers.py @@ -0,0 +1,293 @@ + # flake8: noqa F403, F405 +from .atari_wrappers import * +import numpy as np +import gym + +class TimeLimit(gym.Wrapper): + def __init__(self, env, max_episode_steps=None): + super(TimeLimit, self).__init__(env) + self._max_episode_steps = max_episode_steps + self._elapsed_steps = 0 + + def step(self, ac): + observation, reward, done, info = self.env.step(ac) + self._elapsed_steps += 1 + if self._elapsed_steps >= self._max_episode_steps: + done = True + info['TimeLimit.truncated'] = True + return observation, reward, done, info + + def reset(self, **kwargs): + self._elapsed_steps = 0 + return self.env.reset(**kwargs) + +class StochasticFrameSkip(gym.Wrapper): + def __init__(self, env, n, stickprob): + gym.Wrapper.__init__(self, env) + self.n = n + self.stickprob = stickprob + self.curac = None + self.rng = np.random.RandomState() + self.supports_want_render = hasattr(env, "supports_want_render") + + def reset(self, **kwargs): + self.curac = None + return self.env.reset(**kwargs) + + def step(self, ac): + done = False + totrew = 0 + for i in range(self.n): + # First step after reset, use action + if self.curac is None: + self.curac = ac + # First substep, delay with probability=stickprob + elif i==0: + if self.rng.rand() > self.stickprob: + self.curac = ac + # Second substep, new action definitely kicks in + elif i==1: + self.curac = ac + if self.supports_want_render and i self.channel + for _ in range(self.k): + self.frames.append(ob) + return self._get_ob() + + def step(self, ac): + ob, reward, done, info = self.env.step(ac) + self.frames.append(ob) + return self._get_ob(), reward, done, info + + def _get_ob(self): + assert len(self.frames) == self.k + return np.concatenate([frame if i==self.k-1 else frame[:,:,self.channel:self.channel+1] + for (i, frame) in enumerate(self.frames)], axis=2) + +class Downsample(gym.ObservationWrapper): + def __init__(self, env, ratio): + """ + Downsample images by a factor of ratio + """ + gym.ObservationWrapper.__init__(self, env) + (oldh, oldw, oldc) = env.observation_space.shape + newshape = (oldh//ratio, oldw//ratio, oldc) + self.observation_space = spaces.Box(low=0, high=255, + shape=newshape, dtype=np.uint8) + + def observation(self, frame): + height, width, _ = self.observation_space.shape + frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA) + if frame.ndim == 2: + frame = frame[:,:,None] + return frame + +class Rgb2gray(gym.ObservationWrapper): + def __init__(self, env): + """ + Downsample images by a factor of ratio + """ + gym.ObservationWrapper.__init__(self, env) + (oldh, oldw, _oldc) = env.observation_space.shape + self.observation_space = spaces.Box(low=0, high=255, + shape=(oldh, oldw, 1), dtype=np.uint8) + + def observation(self, frame): + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) + return frame[:,:,None] + + +class MovieRecord(gym.Wrapper): + def __init__(self, env, savedir, k): + gym.Wrapper.__init__(self, env) + self.savedir = savedir + self.k = k + self.epcount = 0 + def reset(self): + if self.epcount % self.k == 0: + print('saving movie this episode', self.savedir) + self.env.unwrapped.movie_path = self.savedir + else: + print('not saving this episode') + self.env.unwrapped.movie_path = None + self.env.unwrapped.movie = None + self.epcount += 1 + return self.env.reset() + +class AppendTimeout(gym.Wrapper): + def __init__(self, env): + gym.Wrapper.__init__(self, env) + self.action_space = env.action_space + self.timeout_space = gym.spaces.Box(low=np.array([0.0]), high=np.array([1.0]), dtype=np.float32) + self.original_os = env.observation_space + if isinstance(self.original_os, gym.spaces.Dict): + import copy + ordered_dict = copy.deepcopy(self.original_os.spaces) + ordered_dict['value_estimation_timeout'] = self.timeout_space + self.observation_space = gym.spaces.Dict(ordered_dict) + self.dict_mode = True + else: + self.observation_space = gym.spaces.Dict({ + 'original': self.original_os, + 'value_estimation_timeout': self.timeout_space + }) + self.dict_mode = False + self.ac_count = None + while 1: + if not hasattr(env, "_max_episode_steps"): # Looking for TimeLimit wrapper that has this field + env = env.env + continue + break + self.timeout = env._max_episode_steps + + def step(self, ac): + self.ac_count += 1 + ob, rew, done, info = self.env.step(ac) + return self._process(ob), rew, done, info + + def reset(self): + self.ac_count = 0 + return self._process(self.env.reset()) + + def _process(self, ob): + fracmissing = 1 - self.ac_count / self.timeout + if self.dict_mode: + ob['value_estimation_timeout'] = fracmissing + else: + return { 'original': ob, 'value_estimation_timeout': fracmissing } + +class StartDoingRandomActionsWrapper(gym.Wrapper): + """ + Warning: can eat info dicts, not good if you depend on them + """ + def __init__(self, env, max_random_steps, on_startup=True, every_episode=False): + gym.Wrapper.__init__(self, env) + self.on_startup = on_startup + self.every_episode = every_episode + self.random_steps = max_random_steps + self.last_obs = None + if on_startup: + self.some_random_steps() + + def some_random_steps(self): + self.last_obs = self.env.reset() + n = np.random.randint(self.random_steps) + #print("running for random %i frames" % n) + for _ in range(n): + self.last_obs, _, done, _ = self.env.step(self.env.action_space.sample()) + if done: self.last_obs = self.env.reset() + + def reset(self): + return self.last_obs + + def step(self, a): + self.last_obs, rew, done, info = self.env.step(a) + if done: + self.last_obs = self.env.reset() + if self.every_episode: + self.some_random_steps() + return self.last_obs, rew, done, info + +def make_retro(*, game, state, max_episode_steps, **kwargs): + import retro + env = retro.make(game, state, **kwargs) + env = StochasticFrameSkip(env, n=4, stickprob=0.25) + if max_episode_steps is not None: + env = TimeLimit(env, max_episode_steps=max_episode_steps) + return env + +def wrap_deepmind_retro(env, scale=True, frame_stack=4): + """ + Configure environment for retro games, using config similar to DeepMind-style Atari in wrap_deepmind + """ + env = WarpFrame(env) + env = ClipRewardEnv(env) + env = FrameStack(env, frame_stack) + if scale: + env = ScaledFloatFrame(env) + return env + +class SonicDiscretizer(gym.ActionWrapper): + """ + Wrap a gym-retro environment and make it use discrete + actions for the Sonic game. + """ + def __init__(self, env): + super(SonicDiscretizer, self).__init__(env) + buttons = ["B", "A", "MODE", "START", "UP", "DOWN", "LEFT", "RIGHT", "C", "Y", "X", "Z"] + actions = [['LEFT'], ['RIGHT'], ['LEFT', 'DOWN'], ['RIGHT', 'DOWN'], ['DOWN'], + ['DOWN', 'B'], ['B']] + self._actions = [] + for action in actions: + arr = np.array([False] * 12) + for button in action: + arr[buttons.index(button)] = True + self._actions.append(arr) + self.action_space = gym.spaces.Discrete(len(self._actions)) + + def action(self, a): # pylint: disable=W0221 + return self._actions[a].copy() + +class RewardScaler(gym.RewardWrapper): + """ + Bring rewards to a reasonable scale for PPO. + This is incredibly important and effects performance + drastically. + """ + def __init__(self, env, scale=0.01): + super(RewardScaler, self).__init__(env) + self.scale = scale + + def reward(self, reward): + return reward * self.scale + +class AllowBacktracking(gym.Wrapper): + """ + Use deltas in max(X) as the reward, rather than deltas + in X. This way, agents are not discouraged too heavily + from exploring backwards if there is no way to advance + head-on in the level. + """ + def __init__(self, env): + super(AllowBacktracking, self).__init__(env) + self._cur_x = 0 + self._max_x = 0 + + def reset(self, **kwargs): # pylint: disable=E0202 + self._cur_x = 0 + self._max_x = 0 + return self.env.reset(**kwargs) + + def step(self, action): # pylint: disable=E0202 + obs, rew, done, info = self.env.step(action) + self._cur_x += rew + rew = max(0, self._cur_x - self._max_x) + self._max_x = max(self._max_x, self._cur_x) + return obs, rew, done, info diff --git a/baselines/common/tests/__init__.py b/baselines/common/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/baselines/common/tests/envs/__init__.py b/baselines/common/tests/envs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/baselines/common/tests/envs/fixed_sequence_env.py b/baselines/common/tests/envs/fixed_sequence_env.py new file mode 100644 index 0000000..9f1b03d --- /dev/null +++ b/baselines/common/tests/envs/fixed_sequence_env.py @@ -0,0 +1,44 @@ +import numpy as np +from gym import Env +from gym.spaces import Discrete + + +class FixedSequenceEnv(Env): + def __init__( + self, + n_actions=10, + seed=0, + episode_len=100 + ): + self.np_random = np.random.RandomState() + self.np_random.seed(seed) + self.sequence = [self.np_random.randint(0, n_actions-1) for _ in range(episode_len)] + + self.action_space = Discrete(n_actions) + self.observation_space = Discrete(1) + + self.episode_len = episode_len + self.time = 0 + self.reset() + + def reset(self): + self.time = 0 + return 0 + + def step(self, actions): + rew = self._get_reward(actions) + self._choose_next_state() + done = False + if self.episode_len and self.time >= self.episode_len: + rew = 0 + done = True + + return 0, rew, done, {} + + def _choose_next_state(self): + self.time += 1 + + def _get_reward(self, actions): + return 1 if actions == self.sequence[self.time] else 0 + + diff --git a/baselines/common/tests/envs/identity_env.py b/baselines/common/tests/envs/identity_env.py new file mode 100644 index 0000000..005d3ff --- /dev/null +++ b/baselines/common/tests/envs/identity_env.py @@ -0,0 +1,70 @@ +import numpy as np +from abc import abstractmethod +from gym import Env +from gym.spaces import Discrete, Box + + +class IdentityEnv(Env): + def __init__( + self, + episode_len=None + ): + + self.episode_len = episode_len + self.time = 0 + self.reset() + + def reset(self): + self._choose_next_state() + self.time = 0 + self.observation_space = self.action_space + + return self.state + + def step(self, actions): + rew = self._get_reward(actions) + self._choose_next_state() + done = False + if self.episode_len and self.time >= self.episode_len: + rew = 0 + done = True + + return self.state, rew, done, {} + + def _choose_next_state(self): + self.state = self.action_space.sample() + self.time += 1 + + @abstractmethod + def _get_reward(self, actions): + raise NotImplementedError + + +class DiscreteIdentityEnv(IdentityEnv): + def __init__( + self, + dim, + episode_len=None, + ): + + self.action_space = Discrete(dim) + super().__init__(episode_len=episode_len) + + def _get_reward(self, actions): + return 1 if self.state == actions else 0 + + +class BoxIdentityEnv(IdentityEnv): + def __init__( + self, + shape, + episode_len=None, + ): + + self.action_space = Box(low=-1.0, high=1.0, shape=shape) + super().__init__(episode_len=episode_len) + + def _get_reward(self, actions): + diff = actions - self.state + diff = diff[:] + return -0.5 * np.dot(diff, diff) diff --git a/baselines/common/tests/envs/mnist_env.py b/baselines/common/tests/envs/mnist_env.py new file mode 100644 index 0000000..563e215 --- /dev/null +++ b/baselines/common/tests/envs/mnist_env.py @@ -0,0 +1,70 @@ +import os.path as osp +import numpy as np +import tempfile +import filelock +from gym import Env +from gym.spaces import Discrete, Box + + + +class MnistEnv(Env): + def __init__( + self, + seed=0, + episode_len=None, + no_images=None + ): + from tensorflow.examples.tutorials.mnist import input_data + # we could use temporary directory for this with a context manager and + # TemporaryDirecotry, but then each test that uses mnist would re-download the data + # this way the data is not cleaned up, but we only download it once per machine + mnist_path = osp.join(tempfile.gettempdir(), 'MNIST_data') + with filelock.FileLock(mnist_path + '.lock'): + self.mnist = input_data.read_data_sets(mnist_path) + + self.np_random = np.random.RandomState() + self.np_random.seed(seed) + + self.observation_space = Box(low=0.0, high=1.0, shape=(28,28,1)) + self.action_space = Discrete(10) + self.episode_len = episode_len + self.time = 0 + self.no_images = no_images + + self.train_mode() + self.reset() + + def reset(self): + self._choose_next_state() + self.time = 0 + + return self.state[0] + + def step(self, actions): + rew = self._get_reward(actions) + self._choose_next_state() + done = False + if self.episode_len and self.time >= self.episode_len: + rew = 0 + done = True + + return self.state[0], rew, done, {} + + def train_mode(self): + self.dataset = self.mnist.train + + def test_mode(self): + self.dataset = self.mnist.test + + def _choose_next_state(self): + max_index = (self.no_images if self.no_images is not None else self.dataset.num_examples) - 1 + index = self.np_random.randint(0, max_index) + image = self.dataset.images[index].reshape(28,28,1)*255 + label = self.dataset.labels[index] + self.state = (image, label) + self.time += 1 + + def _get_reward(self, actions): + return 1 if self.state[1] == actions else 0 + + diff --git a/baselines/common/tests/test_cartpole.py b/baselines/common/tests/test_cartpole.py new file mode 100644 index 0000000..3c33b8f --- /dev/null +++ b/baselines/common/tests/test_cartpole.py @@ -0,0 +1,45 @@ +import pytest +import gym + +from baselines.run import get_learn_function +from baselines.common.tests.util import reward_per_episode_test + +common_kwargs = dict( + total_timesteps=30000, + network='mlp', + gamma=1.0, + seed=0, +) + +learn_kwargs = { + 'a2c' : dict(nsteps=32, value_network='copy', lr=0.05), + 'acktr': dict(nsteps=32, value_network='copy'), + 'deepq': {}, + 'ppo2': dict(value_network='copy'), + 'trpo_mpi': {} +} + + +@pytest.mark.slow +@pytest.mark.parametrize("alg", learn_kwargs.keys()) +def test_cartpole(alg): + ''' + Test if the algorithm (with an mlp policy) + can learn to balance the cartpole + ''' + + kwargs = common_kwargs.copy() + kwargs.update(learn_kwargs[alg]) + + learn_fn = lambda e: get_learn_function(alg)(env=e, **kwargs) + def env_fn(): + + env = gym.make('CartPole-v0') + env.seed(0) + return env + + reward_per_episode_test(env_fn, learn_fn, 100) + + +if __name__ == '__main__': + test_cartpole('a2c') diff --git a/baselines/common/tests/test_fixed_sequence.py b/baselines/common/tests/test_fixed_sequence.py new file mode 100644 index 0000000..f4a7f0c --- /dev/null +++ b/baselines/common/tests/test_fixed_sequence.py @@ -0,0 +1,56 @@ +import pytest +from baselines.common.tests.envs.fixed_sequence_env import FixedSequenceEnv + +from baselines.common.tests.util import simple_test +from baselines.run import get_learn_function + +common_kwargs = dict( + seed=0, + total_timesteps=50000, +) + +learn_kwargs = { + 'a2c': {}, + 'ppo2': dict(nsteps=10, ent_coef=0.0, nminibatches=1), + # TODO enable sequential models for trpo_mpi (proper handling of nbatch and nsteps) + # github issue: https://github.com/openai/baselines/issues/188 + # 'trpo_mpi': lambda e, p: trpo_mpi.learn(policy_fn=p(env=e), env=e, max_timesteps=30000, timesteps_per_batch=100, cg_iters=10, gamma=0.9, lam=1.0, max_kl=0.001) +} + + +alg_list = learn_kwargs.keys() +rnn_list = ['lstm'] + +@pytest.mark.slow +@pytest.mark.parametrize("alg", alg_list) +@pytest.mark.parametrize("rnn", rnn_list) +def test_fixed_sequence(alg, rnn): + ''' + Test if the algorithm (with a given policy) + can learn an identity transformation (i.e. return observation as an action) + ''' + + kwargs = learn_kwargs[alg] + kwargs.update(common_kwargs) + + #rnn_fn = rnn_fns[rnn] + #nlstm=256 + #rnn_fn = partial(rnn_fn, scope='lstm', nh=nlstm) + + episode_len = 5 + env_fn = lambda: FixedSequenceEnv(10, episode_len=episode_len) + learn = lambda e: get_learn_function(alg)( + env=e, + # policy=recurrent(env=e, rnn_fn=rnn_fn, input_embedding_fn=lambda _:_, state_shape=[2*nlstm]), + network=rnn, + **kwargs + ) + + simple_test(env_fn, learn, 0.7) + + +if __name__ == '__main__': + test_fixed_sequence('ppo2', 'lstm') + + + diff --git a/baselines/common/tests/test_identity.py b/baselines/common/tests/test_identity.py new file mode 100644 index 0000000..71d5a3e --- /dev/null +++ b/baselines/common/tests/test_identity.py @@ -0,0 +1,55 @@ +import pytest +from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv +from baselines.run import get_learn_function +from baselines.common.tests.util import simple_test + +common_kwargs = dict( + total_timesteps=30000, + network='mlp', + gamma=0.9, + seed=0, +) + +learn_kwargs = { + 'a2c' : {}, + 'acktr': {}, + 'deepq': {}, + 'ppo2': dict(lr=1e-3, nsteps=64, ent_coef=0.0), + 'trpo_mpi': dict(timesteps_per_batch=100, cg_iters=10, gamma=0.9, lam=1.0, max_kl=0.01) +} + + +@pytest.mark.slow +@pytest.mark.parametrize("alg", learn_kwargs.keys()) +def test_discrete_identity(alg): + ''' + Test if the algorithm (with an mlp policy) + can learn an identity transformation (i.e. return observation as an action) + ''' + + kwargs = learn_kwargs[alg] + kwargs.update(common_kwargs) + + learn_fn = lambda e: get_learn_function(alg)(env=e, **kwargs) + env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100) + simple_test(env_fn, learn_fn, 0.9) + +@pytest.mark.slow +@pytest.mark.parametrize("alg", ['a2c', 'ppo2', 'trpo_mpi']) +def test_continuous_identity(alg): + ''' + Test if the algorithm (with an mlp policy) + can learn an identity transformation (i.e. return observation as an action) + to a required precision + ''' + + kwargs = learn_kwargs[alg] + kwargs.update(common_kwargs) + learn_fn = lambda e: get_learn_function(alg)(env=e, **kwargs) + + env_fn = lambda: BoxIdentityEnv((1,), episode_len=100) + simple_test(env_fn, learn_fn, -0.1) + +if __name__ == '__main__': + test_continuous_identity('a2c') + diff --git a/baselines/common/tests/test_mnist.py b/baselines/common/tests/test_mnist.py new file mode 100644 index 0000000..5489c3a --- /dev/null +++ b/baselines/common/tests/test_mnist.py @@ -0,0 +1,50 @@ +import pytest + +# from baselines.acer import acer_simple as acer +from baselines.common.tests.envs.mnist_env import MnistEnv +from baselines.common.tests.util import simple_test +from baselines.run import get_learn_function + + +# TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem? +# GitHub issue https://github.com/openai/baselines/issues/189 +common_kwargs = { + 'seed': 0, + 'network':'cnn', + 'gamma':0.9, + 'pad':'SAME' +} + +learn_args = { + 'a2c': dict(total_timesteps=50000), + # TODO need to resolve inference (step) API differences for acer; also slow + # 'acer': dict(seed=0, total_timesteps=1000), + 'deepq': dict(total_timesteps=5000), + 'acktr': dict(total_timesteps=30000), + 'ppo2': dict(total_timesteps=50000, lr=1e-3, nsteps=128, ent_coef=0.0), + 'trpo_mpi': dict(total_timesteps=80000, timesteps_per_batch=100, cg_iters=10, lam=1.0, max_kl=0.001) +} + + +#tests pass, but are too slow on travis. Same algorithms are covered +# by other tests with less compute-hungry nn's and by benchmarks +@pytest.mark.skip +@pytest.mark.slow +@pytest.mark.parametrize("alg", learn_args.keys()) +def test_mnist(alg): + ''' + Test if the algorithm can learn to classify MNIST digits. + Uses CNN policy. + ''' + + learn_kwargs = learn_args[alg] + learn_kwargs.update(common_kwargs) + + learn = get_learn_function(alg) + learn_fn = lambda e: learn(env=e, **learn_kwargs) + env_fn = lambda: MnistEnv(seed=0, episode_len=100) + + simple_test(env_fn, learn_fn, 0.6) + +if __name__ == '__main__': + test_mnist('deepq') diff --git a/baselines/common/tests/test_serialization.py b/baselines/common/tests/test_serialization.py new file mode 100644 index 0000000..341b60b --- /dev/null +++ b/baselines/common/tests/test_serialization.py @@ -0,0 +1,84 @@ +import os +import tempfile +import pytest +import tensorflow as tf +import numpy as np + +from baselines.common.tests.envs.mnist_env import MnistEnv +from baselines.common.vec_env.dummy_vec_env import DummyVecEnv +from baselines.run import get_learn_function +from baselines.common.tf_util import make_session + +from functools import partial + + +learn_kwargs = { + 'a2c': {}, + 'acktr': {}, + 'ppo2': {'nminibatches': 1, 'nsteps': 10}, + 'trpo_mpi': {}, +} + +network_kwargs = { + 'mlp': {}, + 'cnn': {'pad': 'SAME'}, + 'lstm': {}, + 'cnn_lnlstm': {'pad': 'SAME'} +} + + +@pytest.mark.parametrize("learn_fn", learn_kwargs.keys()) +@pytest.mark.parametrize("network_fn", network_kwargs.keys()) +def test_serialization(learn_fn, network_fn): + ''' + Test if the trained model can be serialized + ''' + + + if network_fn.endswith('lstm') and learn_fn in ['acktr', 'trpo_mpi', 'deepq']: + # TODO make acktr work with recurrent policies + # and test + # github issue: https://github.com/openai/baselines/issues/194 + return + + env = DummyVecEnv([lambda: MnistEnv(10, episode_len=100)]) + ob = env.reset() + learn = get_learn_function(learn_fn) + + kwargs = {} + kwargs.update(network_kwargs[network_fn]) + kwargs.update(learn_kwargs[learn_fn]) + + + learn = partial(learn, env=env, network=network_fn, seed=None, **kwargs) + + with tempfile.TemporaryDirectory() as td: + model_path = os.path.join(td, 'serialization_test_model') + + with tf.Graph().as_default(), make_session().as_default(): + model = learn(total_timesteps=100) + model.save(model_path) + mean1, std1 = _get_action_stats(model, ob) + + with tf.Graph().as_default(), make_session().as_default(): + model = learn(total_timesteps=10) + model.load(model_path) + mean2, std2 = _get_action_stats(model, ob) + + np.testing.assert_allclose(mean1, mean2, atol=0.5) + np.testing.assert_allclose(std1, std2, atol=0.5) + + + +def _get_action_stats(model, ob): + ntrials = 1000 + if model.initial_state is None or model.initial_state == []: + actions = np.array([model.step(ob)[0] for _ in range(ntrials)]) + else: + actions = np.array([model.step(ob, S=model.initial_state, M=[False])[0] for _ in range(ntrials)]) + + mean = np.mean(actions, axis=0) + std = np.std(actions, axis=0) + + return mean, std + diff --git a/baselines/common/tests/util.py b/baselines/common/tests/util.py new file mode 100644 index 0000000..8cf7e83 --- /dev/null +++ b/baselines/common/tests/util.py @@ -0,0 +1,73 @@ +import tensorflow as tf +import numpy as np +from gym.spaces import np_random +from baselines.common.vec_env.dummy_vec_env import DummyVecEnv + +N_TRIALS = 10000 +N_EPISODES = 100 + +def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS): + np.random.seed(0) + np_random.seed(0) + + env = DummyVecEnv([env_fn]) + + + with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default(): + tf.set_random_seed(0) + + model = learn_fn(env) + + sum_rew = 0 + done = True + + for i in range(n_trials): + if done: + obs = env.reset() + state = model.initial_state + + if state is not None: + a, v, state, _ = model.step(obs, S=state, M=[False]) + else: + a, v, _, _ = model.step(obs) + + obs, rew, done, _ = env.step(a) + sum_rew += float(rew) + + print("Reward in {} trials is {}".format(n_trials, sum_rew)) + assert sum_rew > min_reward_fraction * n_trials, \ + 'sum of rewards {} is less than {} of the total number of trials {}'.format(sum_rew, min_reward_fraction, n_trials) + + +def reward_per_episode_test(env_fn, learn_fn, min_avg_reward, n_trials=N_EPISODES): + env = DummyVecEnv([env_fn]) + + with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default(): + model = learn_fn(env) + + done = True + N_TRIALS = 100 + + rewards = [] + + for i in range(N_TRIALS): + obs = env.reset() + state = model.initial_state + episode_rew = 0 + while True: + if state is not None: + a, v, state, _ = model.step(obs, S=state, M=[False]) + else: + a,v, _, _ = model.step(obs) + + obs, rew, done, _ = env.step(a) + episode_rew += rew + if done: + break + + rewards.append(episode_rew) + avg_rew = sum(rewards) / N_TRIALS + print("Average reward in {} episodes is {}".format(n_trials, avg_rew)) + assert avg_rew > min_avg_reward, \ + 'average reward in {} episodes ({}) is less than {}'.format(n_trials, avg_rew, min_avg_reward) + diff --git a/baselines/deepq/deepq.py b/baselines/deepq/deepq.py new file mode 100644 index 0000000..fc84f7c --- /dev/null +++ b/baselines/deepq/deepq.py @@ -0,0 +1,320 @@ +import os +import tempfile + +import tensorflow as tf +import zipfile +import cloudpickle +import numpy as np + +import baselines.common.tf_util as U +from baselines.common.tf_util import load_state, save_state +from baselines import logger +from baselines.common.schedules import LinearSchedule +from baselines.common import set_global_seeds + +from baselines import deepq +from baselines.deepq.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer +from baselines.deepq.utils import ObservationInput + +from baselines.common.tf_util import get_session +from baselines.deepq.models import build_q_func + + +class ActWrapper(object): + def __init__(self, act, act_params): + self._act = act + self._act_params = act_params + self.initial_state = None + + @staticmethod + def load(path): + with open(path, "rb") as f: + model_data, act_params = cloudpickle.load(f) + act = deepq.build_act(**act_params) + sess = tf.Session() + sess.__enter__() + with tempfile.TemporaryDirectory() as td: + arc_path = os.path.join(td, "packed.zip") + with open(arc_path, "wb") as f: + f.write(model_data) + + zipfile.ZipFile(arc_path, 'r', zipfile.ZIP_DEFLATED).extractall(td) + load_state(os.path.join(td, "model")) + + return ActWrapper(act, act_params) + + def __call__(self, *args, **kwargs): + return self._act(*args, **kwargs) + + def step(self, observation, **kwargs): + return self._act([observation], **kwargs), None, None, None + + def save(self, path=None): + """Save model to a pickle located at `path`""" + if path is None: + path = os.path.join(logger.get_dir(), "model.pkl") + + with tempfile.TemporaryDirectory() as td: + save_state(os.path.join(td, "model")) + arc_name = os.path.join(td, "packed.zip") + with zipfile.ZipFile(arc_name, 'w') as zipf: + for root, dirs, files in os.walk(td): + for fname in files: + file_path = os.path.join(root, fname) + if file_path != arc_name: + zipf.write(file_path, os.path.relpath(file_path, td)) + with open(arc_name, "rb") as f: + model_data = f.read() + with open(path, "wb") as f: + cloudpickle.dump((model_data, self._act_params), f) + + +def load(path): + """Load act function that was returned by learn function. + + Parameters + ---------- + path: str + path to the act function pickle + + Returns + ------- + act: ActWrapper + function that takes a batch of observations + and returns actions. + """ + return ActWrapper.load(path) + + +def learn(env, + network, + seed=None, + lr=5e-4, + total_timesteps=100000, + buffer_size=50000, + exploration_fraction=0.1, + exploration_final_eps=0.02, + train_freq=1, + batch_size=32, + print_freq=100, + checkpoint_freq=10000, + checkpoint_path=None, + learning_starts=1000, + gamma=1.0, + target_network_update_freq=500, + prioritized_replay=False, + prioritized_replay_alpha=0.6, + prioritized_replay_beta0=0.4, + prioritized_replay_beta_iters=None, + prioritized_replay_eps=1e-6, + param_noise=False, + callback=None, + **network_kwargs + ): + """Train a deepq model. + + Parameters + ------- + env: gym.Env + environment to train on + q_func: (tf.Variable, int, str, bool) -> tf.Variable + the model that takes the following inputs: + observation_in: object + the output of observation placeholder + num_actions: int + number of actions + scope: str + reuse: bool + should be passed to outer variable scope + and returns a tensor of shape (batch_size, num_actions) with values of every action. + lr: float + learning rate for adam optimizer + total_timesteps: int + number of env steps to optimizer for + buffer_size: int + size of the replay buffer + exploration_fraction: float + fraction of entire training period over which the exploration rate is annealed + exploration_final_eps: float + final value of random action probability + train_freq: int + update the model every `train_freq` steps. + set to None to disable printing + batch_size: int + size of a batched sampled from replay buffer for training + print_freq: int + how often to print out training progress + set to None to disable printing + checkpoint_freq: int + how often to save the model. This is so that the best version is restored + at the end of the training. If you do not wish to restore the best version at + the end of the training set this variable to None. + learning_starts: int + how many steps of the model to collect transitions for before learning starts + gamma: float + discount factor + target_network_update_freq: int + update the target network every `target_network_update_freq` steps. + prioritized_replay: True + if True prioritized replay buffer will be used. + prioritized_replay_alpha: float + alpha parameter for prioritized replay buffer + prioritized_replay_beta0: float + initial value of beta for prioritized replay buffer + prioritized_replay_beta_iters: int + number of iterations over which beta will be annealed from initial value + to 1.0. If set to None equals to total_timesteps. + prioritized_replay_eps: float + epsilon to add to the TD errors when updating priorities. + callback: (locals, globals) -> None + function called at every steps with state of the algorithm. + If callback returns true training stops. + + + Returns + ------- + act: ActWrapper + Wrapper over act function. Adds ability to save it and load it. + See header of baselines/deepq/categorical.py for details on the act function. + """ + # Create all the functions necessary to train the model + + sess = get_session() + + set_global_seeds(seed) + + q_func = build_q_func(network, **network_kwargs) + + # capture the shape outside the closure so that the env object is not serialized + # by cloudpickle when serializing make_obs_ph + + def make_obs_ph(name): + return ObservationInput(env.observation_space, name=name) + + act, train, update_target, debug = deepq.build_train( + make_obs_ph=make_obs_ph, + q_func=q_func, + num_actions=env.action_space.n, + optimizer=tf.train.AdamOptimizer(learning_rate=lr), + gamma=gamma, + grad_norm_clipping=10, + param_noise=param_noise + ) + + act_params = { + 'make_obs_ph': make_obs_ph, + 'q_func': q_func, + 'num_actions': env.action_space.n, + } + + act = ActWrapper(act, act_params) + + # Create the replay buffer + if prioritized_replay: + replay_buffer = PrioritizedReplayBuffer(buffer_size, alpha=prioritized_replay_alpha) + if prioritized_replay_beta_iters is None: + prioritized_replay_beta_iters = total_timesteps + beta_schedule = LinearSchedule(prioritized_replay_beta_iters, + initial_p=prioritized_replay_beta0, + final_p=1.0) + else: + replay_buffer = ReplayBuffer(buffer_size) + beta_schedule = None + # Create the schedule for exploration starting from 1. + exploration = LinearSchedule(schedule_timesteps=int(exploration_fraction * total_timesteps), + initial_p=1.0, + final_p=exploration_final_eps) + + # Initialize the parameters and copy them to the target network. + U.initialize() + update_target() + + episode_rewards = [0.0] + saved_mean_reward = None + obs = env.reset() + reset = True + + with tempfile.TemporaryDirectory() as td: + td = checkpoint_path or td + + model_file = os.path.join(td, "model") + model_saved = False + if tf.train.latest_checkpoint(td) is not None: + load_state(model_file) + logger.log('Loaded model from {}'.format(model_file)) + model_saved = True + + for t in range(total_timesteps): + if callback is not None: + if callback(locals(), globals()): + break + # Take action and update exploration to the newest value + kwargs = {} + if not param_noise: + update_eps = exploration.value(t) + update_param_noise_threshold = 0. + else: + update_eps = 0. + # Compute the threshold such that the KL divergence between perturbed and non-perturbed + # policy is comparable to eps-greedy exploration with eps = exploration.value(t). + # See Appendix C.1 in Parameter Space Noise for Exploration, Plappert et al., 2017 + # for detailed explanation. + update_param_noise_threshold = -np.log(1. - exploration.value(t) + exploration.value(t) / float(env.action_space.n)) + kwargs['reset'] = reset + kwargs['update_param_noise_threshold'] = update_param_noise_threshold + kwargs['update_param_noise_scale'] = True + action = act(np.array(obs)[None], update_eps=update_eps, **kwargs)[0] + env_action = action + reset = False + new_obs, rew, done, _ = env.step(env_action) + # Store transition in the replay buffer. + replay_buffer.add(obs, action, rew, new_obs, float(done)) + obs = new_obs + + episode_rewards[-1] += rew + if done: + obs = env.reset() + episode_rewards.append(0.0) + reset = True + + if t > learning_starts and t % train_freq == 0: + # Minimize the error in Bellman's equation on a batch sampled from replay buffer. + if prioritized_replay: + experience = replay_buffer.sample(batch_size, beta=beta_schedule.value(t)) + (obses_t, actions, rewards, obses_tp1, dones, weights, batch_idxes) = experience + else: + obses_t, actions, rewards, obses_tp1, dones = replay_buffer.sample(batch_size) + weights, batch_idxes = np.ones_like(rewards), None + td_errors = train(obses_t, actions, rewards, obses_tp1, dones, weights) + if prioritized_replay: + new_priorities = np.abs(td_errors) + prioritized_replay_eps + replay_buffer.update_priorities(batch_idxes, new_priorities) + + if t > learning_starts and t % target_network_update_freq == 0: + # Update target network periodically. + update_target() + + mean_100ep_reward = round(np.mean(episode_rewards[-101:-1]), 1) + num_episodes = len(episode_rewards) + if done and print_freq is not None and len(episode_rewards) % print_freq == 0: + logger.record_tabular("steps", t) + logger.record_tabular("episodes", num_episodes) + logger.record_tabular("mean 100 episode reward", mean_100ep_reward) + logger.record_tabular("% time spent exploring", int(100 * exploration.value(t))) + logger.dump_tabular() + + if (checkpoint_freq is not None and t > learning_starts and + num_episodes > 100 and t % checkpoint_freq == 0): + if saved_mean_reward is None or mean_100ep_reward > saved_mean_reward: + if print_freq is not None: + logger.log("Saving model due to mean reward increase: {} -> {}".format( + saved_mean_reward, mean_100ep_reward)) + save_state(model_file) + model_saved = True + saved_mean_reward = mean_100ep_reward + if model_saved: + if print_freq is not None: + logger.log("Restored model with mean reward: {}".format(saved_mean_reward)) + load_state(model_file) + + return act diff --git a/baselines/deepq/defaults.py b/baselines/deepq/defaults.py new file mode 100644 index 0000000..d41fb18 --- /dev/null +++ b/baselines/deepq/defaults.py @@ -0,0 +1,21 @@ +def atari(): + return dict( + network='conv_only', + lr=1e-4, + buffer_size=10000, + exploration_fraction=0.1, + exploration_final_eps=0.01, + train_freq=4, + learning_starts=10000, + target_network_update_freq=1000, + gamma=0.99, + prioritized_replay=True, + prioritized_replay_alpha=0.6, + checkpoint_freq=10000, + checkpoint_path=None, + dueling=True + ) + +def retro(): + return atari() + diff --git a/baselines/deepq/experiments/enjoy_retro.py b/baselines/deepq/experiments/enjoy_retro.py new file mode 100644 index 0000000..526af16 --- /dev/null +++ b/baselines/deepq/experiments/enjoy_retro.py @@ -0,0 +1,34 @@ +import argparse + +import numpy as np + +from baselines import deepq +from baselines.common import retro_wrappers + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--env', help='environment ID', default='SuperMarioBros-Nes') + parser.add_argument('--gamestate', help='game state to load', default='Level1-1') + parser.add_argument('--model', help='model pickle file from ActWrapper.save', default='model.pkl') + args = parser.parse_args() + + env = retro_wrappers.make_retro(game=args.env, state=args.gamestate, max_episode_steps=None) + env = retro_wrappers.wrap_deepmind_retro(env) + act = deepq.load(args.model) + + while True: + obs, done = env.reset(), False + episode_rew = 0 + while not done: + env.render() + action = act(obs[None])[0] + env_action = np.zeros(env.action_space.n) + env_action[action] = 1 + obs, rew, done, _ = env.step(env_action) + episode_rew += rew + print('Episode reward', episode_rew) + + +if __name__ == '__main__': + main() diff --git a/baselines/deepq/experiments/run_retro.py b/baselines/deepq/experiments/run_retro.py new file mode 100644 index 0000000..0338361 --- /dev/null +++ b/baselines/deepq/experiments/run_retro.py @@ -0,0 +1,49 @@ +import argparse + +from baselines import deepq +from baselines.common import set_global_seeds +from baselines import bench +from baselines import logger +from baselines.common import retro_wrappers +import retro + + +def main(): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--env', help='environment ID', default='SuperMarioBros-Nes') + parser.add_argument('--gamestate', help='game state to load', default='Level1-1') + parser.add_argument('--seed', help='seed', type=int, default=0) + parser.add_argument('--num-timesteps', type=int, default=int(10e6)) + args = parser.parse_args() + logger.configure() + set_global_seeds(args.seed) + env = retro_wrappers.make_retro(game=args.env, state=args.gamestate, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE) + env.seed(args.seed) + env = bench.Monitor(env, logger.get_dir()) + env = retro_wrappers.wrap_deepmind_retro(env) + + model = deepq.models.cnn_to_mlp( + convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], + hiddens=[256], + dueling=True + ) + act = deepq.learn( + env, + q_func=model, + lr=1e-4, + max_timesteps=args.num_timesteps, + buffer_size=10000, + exploration_fraction=0.1, + exploration_final_eps=0.01, + train_freq=4, + learning_starts=10000, + target_network_update_freq=1000, + gamma=0.99, + prioritized_replay=True + ) + act.save() + env.close() + + +if __name__ == '__main__': + main() diff --git a/baselines/run.py b/baselines/run.py new file mode 100644 index 0000000..310aaa8 --- /dev/null +++ b/baselines/run.py @@ -0,0 +1,211 @@ +import sys +import multiprocessing +import os.path as osp +import gym +from collections import defaultdict + +from baselines.common.vec_env.vec_frame_stack import VecFrameStack +from baselines.common.cmd_util import common_arg_parser, parse_unknown_args, make_mujoco_env, make_atari_env +from baselines.common.tf_util import save_state, load_state +from baselines import bench, logger +from importlib import import_module + +from baselines.common.vec_env.vec_normalize import VecNormalize +from baselines.common.vec_env.dummy_vec_env import DummyVecEnv +from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv +from baselines.common import atari_wrappers, retro_wrappers + +try: + from mpi4py import MPI +except ImportError: + MPI = None + +_game_envs = defaultdict(set) +for env in gym.envs.registry.all(): + # solve this with regexes + env_type = env._entry_point.split(':')[0].split('.')[-1] + if env.id[:-3].endswith('NoFrameskip'): + _game_envs[env_type].add(env.id) + +# reading benchmark names directly from retro requires +# importing retro here, and for some reason that crashes tensorflow +# in ubuntu +if hasattr(bench.benchmarks, '_retro'): + _game_envs['retro'] = set([g[0] for g in bench.benchmarks._retro]) + + +def train(args, extra_args): + env_type, env_id = get_env_type(args.env) + + total_timesteps = int(args.num_timesteps) + seed = args.seed + + learn = get_learn_function(args.alg) + alg_kwargs = get_learn_function_defaults(args.alg, env_type) + alg_kwargs.update(extra_args) + + env = build_env(args) + + if args.network: + alg_kwargs['network'] = args.network + else: + if alg_kwargs.get('network') is None: + alg_kwargs['network'] = get_default_network(env_type) + + + + print('Training {} on {}:{} with arguments \n{}'.format(args.alg, env_type, env_id, alg_kwargs)) + + model = learn( + env=env, + seed=seed, + total_timesteps=total_timesteps, + **alg_kwargs + ) + + return model, env + + +def build_env(args, render=False): + ncpu = multiprocessing.cpu_count() + if sys.platform == 'darwin': ncpu //= 2 + nenv = args.num_env or ncpu if not render else 1 + alg = args.alg + rank = MPI.COMM_WORLD.Get_rank() if MPI else 0 + + seed = args.seed + 10000 * rank if args.seed is not None else None + + env_type, env_id = get_env_type(args.env) + if env_type == 'mujoco': + if args.num_env: + env = SubprocVecEnv([lambda: make_mujoco_env(env_id, seed + i, args.reward_scale) for i in range(args.num_env)]) + else: + env = DummyVecEnv([lambda: make_mujoco_env(env_id, seed, args.reward_scale)]) + + env = VecNormalize(env) + + elif env_type == 'atari': + if alg == 'acer': + env = make_atari_env(env_id, nenv, seed) + elif alg == 'deepq': + env = atari_wrappers.make_atari(env_id) + env.seed(seed) + env = bench.Monitor(env, logger.get_dir()) + env = atari_wrappers.wrap_deepmind(env, frame_stack=True, scale=True) + elif alg == 'trpo_mpi': + env = atari_wrappers.make_atari(env_id) + env.seed(seed) + env = bench.Monitor(env, logger.get_dir() and osp.join(logger.get_dir(), str(rank))) + env = atari_wrappers.wrap_deepmind(env) + # TODO check if the second seeding is necessary, and eventually remove + env.seed(seed) + else: + frame_stack_size = 4 + env = VecFrameStack(make_atari_env(env_id, nenv, seed), frame_stack_size) + + elif env_type == 'retro': + import retro + gamestate = args.gamestate or 'Level1-1' + env = retro_wrappers.make_retro(game=args.env, state=gamestate, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE) + env.seed(args.seed) + env = bench.Monitor(env, logger.get_dir()) + env = retro_wrappers.wrap_deepmind_retro(env) + + elif env_type == 'classic': + def make_env(): + e = gym.make(env_id) + e.seed(seed) + return e + + env = DummyVecEnv([make_env]) + + return env + + +def get_env_type(env_id): + if env_id in _game_envs.keys(): + env_type = env_id + env_id = [g for g in _game_envs[env_type]][0] + else: + env_type = None + for g, e in _game_envs.items(): + if env_id in e: + env_type = g + break + assert env_type is not None, 'env_id {} is not recognized in env types'.format(env_id, _game_envs.keys()) + + return env_type, env_id + +def get_default_network(env_type): + if env_type == 'mujoco' or env_type=='classic': + return 'mlp' + if env_type == 'atari': + return 'cnn' + + raise ValueError('Unknown env_type {}'.format(env_type)) + + +def get_learn_function(alg): + alg_module = import_module('.'.join(['baselines', alg, alg])) + return alg_module.learn + +def get_learn_function_defaults(alg, env_type): + try: + alg_defaults = import_module('.'.join(['baselines', alg, 'defaults'])) + kwargs = getattr(alg_defaults, env_type)() + except (ImportError, AttributeError): + kwargs = {} + + return kwargs + +def parse(v): + ''' + convert value of a command-line arg to a python object if possible, othewise, keep as string + ''' + + assert isinstance(v, str) + try: + return eval(v) + except (NameError, SyntaxError): + return v + + +def main(): + # configure logger, disable logging in child MPI processes (with rank > 0) + + arg_parser = common_arg_parser() + args, unknown_args = arg_parser.parse_known_args() + extra_args = {k: parse(v) for k,v in parse_unknown_args(unknown_args).items()} + + if MPI is None or MPI.COMM_WORLD.Get_rank() == 0: + rank = 0 + logger.configure() + else: + logger.configure(format_strs = []) + rank = MPI.COMM_WORLD.Get_rank() + + if args.model_path is not None: + if osp.exists(args.model_path): + load_state(osp.expanduser(args.model_path)) + + model, _ = train(args, extra_args) + + if args.model_path is not None and rank == 0: + save_state(osp.expanduser(args.model_path)) + + + if args.play: + logger.log("Running trained model") + env = build_env(args, render=True) + obs = env.reset() + while True: + actions = model.step(obs)[0] + obs, _, done, _ = env.step(actions) + env.render() + if done: + obs = env.reset() + + + +if __name__ == '__main__': + main() diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..3493c45 --- /dev/null +++ b/conftest.py @@ -0,0 +1,19 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption('--runslow', action='store_true', default=False, help='run slow tests') + + +def pytest_collection_modifyitems(config, items): + if config.getoption('--runslow'): + # --runslow given in cli: do not skip slow tests + return + skip_slow = pytest.mark.skip(reason='need --runslow option to run') + slow_tests = [] + for item in items: + if 'slow' in item.keywords: + slow_tests.append(item.name) + item.add_marker(skip_slow) + + print('skipping slow tests', ' '.join(slow_tests), 'use --runslow to run this')