mpi-less baselines (#689)

* make baselines run without mpi wip

* squash-merged latest master

* further removing MPI references where unnecessary

* more MPI removal

* syntax and flake8

* MpiAdam becomes regular Adam if Mpi not present

* autopep8

* add assertion to test in mpi_adam; fix trpo_mpi failure without MPI on cartpole

* mpiless ddpg
This commit is contained in:
pzhokhov
2018-10-31 11:15:41 -07:00
committed by GitHub
parent a071fa7630
commit ab59de6922
8 changed files with 124 additions and 45 deletions

View File

@@ -1,16 +1,7 @@
FROM ubuntu:16.04 FROM python:3.6
RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv # RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv
ENV CODE_DIR /root/code ENV CODE_DIR /root/code
ENV VENV /root/venv
RUN \
pip install virtualenv && \
virtualenv $VENV --python=python3 && \
. $VENV/bin/activate && \
pip install --upgrade pip
ENV PATH=$VENV/bin:$PATH
COPY . $CODE_DIR/baselines COPY . $CODE_DIR/baselines
WORKDIR $CODE_DIR/baselines WORKDIR $CODE_DIR/baselines

View File

@@ -1,7 +1,11 @@
from mpi4py import MPI
import baselines.common.tf_util as U import baselines.common.tf_util as U
import tensorflow as tf import tensorflow as tf
import numpy as np import numpy as np
try:
from mpi4py import MPI
except ImportError:
MPI = None
class MpiAdam(object): class MpiAdam(object):
def __init__(self, var_list, *, beta1=0.9, beta2=0.999, epsilon=1e-08, scale_grad_by_procs=True, comm=None): def __init__(self, var_list, *, beta1=0.9, beta2=0.999, epsilon=1e-08, scale_grad_by_procs=True, comm=None):
@@ -16,16 +20,19 @@ class MpiAdam(object):
self.t = 0 self.t = 0
self.setfromflat = U.SetFromFlat(var_list) self.setfromflat = U.SetFromFlat(var_list)
self.getflat = U.GetFlat(var_list) self.getflat = U.GetFlat(var_list)
self.comm = MPI.COMM_WORLD if comm is None else comm self.comm = MPI.COMM_WORLD if comm is None and MPI is not None else comm
def update(self, localg, stepsize): def update(self, localg, stepsize):
if self.t % 100 == 0: if self.t % 100 == 0:
self.check_synced() self.check_synced()
localg = localg.astype('float32') localg = localg.astype('float32')
globalg = np.zeros_like(localg) if self.comm is not None:
self.comm.Allreduce(localg, globalg, op=MPI.SUM) globalg = np.zeros_like(localg)
if self.scale_grad_by_procs: self.comm.Allreduce(localg, globalg, op=MPI.SUM)
globalg /= self.comm.Get_size() if self.scale_grad_by_procs:
globalg /= self.comm.Get_size()
else:
globalg = np.copy(localg)
self.t += 1 self.t += 1
a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t) a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t)
@@ -35,11 +42,15 @@ class MpiAdam(object):
self.setfromflat(self.getflat() + step) self.setfromflat(self.getflat() + step)
def sync(self): def sync(self):
if self.comm is None:
return
theta = self.getflat() theta = self.getflat()
self.comm.Bcast(theta, root=0) self.comm.Bcast(theta, root=0)
self.setfromflat(theta) self.setfromflat(theta)
def check_synced(self): def check_synced(self):
if self.comm is None:
return
if self.comm.Get_rank() == 0: # this is root if self.comm.Get_rank() == 0: # this is root
theta = self.getflat() theta = self.getflat()
self.comm.Bcast(theta, root=0) self.comm.Bcast(theta, root=0)
@@ -63,17 +74,30 @@ def test_MpiAdam():
do_update = U.function([], loss, updates=[update_op]) do_update = U.function([], loss, updates=[update_op])
tf.get_default_session().run(tf.global_variables_initializer()) tf.get_default_session().run(tf.global_variables_initializer())
losslist_ref = []
for i in range(10): for i in range(10):
print(i,do_update()) l = do_update()
print(i, l)
losslist_ref.append(l)
tf.set_random_seed(0) tf.set_random_seed(0)
tf.get_default_session().run(tf.global_variables_initializer()) tf.get_default_session().run(tf.global_variables_initializer())
var_list = [a,b] var_list = [a,b]
lossandgrad = U.function([], [loss, U.flatgrad(loss, var_list)], updates=[update_op]) lossandgrad = U.function([], [loss, U.flatgrad(loss, var_list)])
adam = MpiAdam(var_list) adam = MpiAdam(var_list)
losslist_test = []
for i in range(10): for i in range(10):
l,g = lossandgrad() l,g = lossandgrad()
adam.update(g, stepsize) adam.update(g, stepsize)
print(i,l) print(i,l)
losslist_test.append(l)
np.testing.assert_allclose(np.array(losslist_ref), np.array(losslist_test), atol=1e-4)
if __name__ == '__main__':
test_MpiAdam()

View File

@@ -1,4 +1,8 @@
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
import tensorflow as tf, baselines.common.tf_util as U, numpy as np import tensorflow as tf, baselines.common.tf_util as U, numpy as np
class RunningMeanStd(object): class RunningMeanStd(object):
@@ -39,7 +43,8 @@ class RunningMeanStd(object):
n = int(np.prod(self.shape)) n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64') totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')]) addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM) if MPI is not None:
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n]) self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])
@U.in_session @U.in_session

View File

@@ -12,8 +12,11 @@ import baselines.common.tf_util as U
from baselines import logger from baselines import logger
import numpy as np import numpy as np
from mpi4py import MPI
try:
from mpi4py import MPI
except ImportError:
MPI = None
def learn(network, env, def learn(network, env,
seed=None, seed=None,
@@ -49,7 +52,11 @@ def learn(network, env,
else: else:
nb_epochs = 500 nb_epochs = 500
rank = MPI.COMM_WORLD.Get_rank() if MPI is not None:
rank = MPI.COMM_WORLD.Get_rank()
else:
rank = 0
nb_actions = env.action_space.shape[-1] nb_actions = env.action_space.shape[-1]
assert (np.abs(env.action_space.low) == env.action_space.high).all() # we assume symmetric actions. assert (np.abs(env.action_space.low) == env.action_space.high).all() # we assume symmetric actions.
@@ -199,7 +206,11 @@ def learn(network, env,
eval_episode_rewards_history.append(eval_episode_reward[d]) eval_episode_rewards_history.append(eval_episode_reward[d])
eval_episode_reward[d] = 0.0 eval_episode_reward[d] = 0.0
mpi_size = MPI.COMM_WORLD.Get_size() if MPI is not None:
mpi_size = MPI.COMM_WORLD.Get_size()
else:
mpi_size = 1
# Log stats. # Log stats.
# XXX shouldn't call np.mean on variable length lists # XXX shouldn't call np.mean on variable length lists
duration = time.time() - start_time duration = time.time() - start_time
@@ -233,7 +244,10 @@ def learn(network, env,
else: else:
raise ValueError('expected scalar, got %s'%x) raise ValueError('expected scalar, got %s'%x)
combined_stats_sums = MPI.COMM_WORLD.allreduce(np.array([ np.array(x).flatten()[0] for x in combined_stats.values()])) combined_stats_sums = np.array([ np.array(x).flatten()[0] for x in combined_stats.values()])
if MPI is not None:
combined_stats_sums = MPI.COMM_WORLD.allreduce(combined_stats_sums)
combined_stats = {k : v / mpi_size for (k,v) in zip(combined_stats.keys(), combined_stats_sums)} combined_stats = {k : v / mpi_size for (k,v) in zip(combined_stats.keys(), combined_stats_sums)}
# Total statistics. # Total statistics.

View File

@@ -9,7 +9,10 @@ from baselines import logger
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
import baselines.common.tf_util as U import baselines.common.tf_util as U
from baselines.common.mpi_running_mean_std import RunningMeanStd from baselines.common.mpi_running_mean_std import RunningMeanStd
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
def normalize(x, stats): def normalize(x, stats):
if stats is None: if stats is None:
@@ -358,6 +361,11 @@ class DDPG(object):
return stats return stats
def adapt_param_noise(self): def adapt_param_noise(self):
try:
from mpi4py import MPI
except ImportError:
MPI = None
if self.param_noise is None: if self.param_noise is None:
return 0. return 0.
@@ -371,7 +379,16 @@ class DDPG(object):
self.param_noise_stddev: self.param_noise.current_stddev, self.param_noise_stddev: self.param_noise.current_stddev,
}) })
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size() if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
self.param_noise.adapt(mean_distance) self.param_noise.adapt(mean_distance)
return mean_distance return mean_distance

View File

@@ -10,11 +10,15 @@ from baselines.common import explained_variance, set_global_seeds
from baselines.common.policies import build_policy from baselines.common.policies import build_policy
from baselines.common.runners import AbstractEnvRunner from baselines.common.runners import AbstractEnvRunner
from baselines.common.tf_util import get_session, save_variables, load_variables from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
from mpi4py import MPI try:
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
from mpi4py import MPI
from baselines.common.mpi_util import sync_from_root
except ImportError:
MPI = None
from baselines.common.tf_util import initialize from baselines.common.tf_util import initialize
from baselines.common.mpi_util import sync_from_root
class Model(object): class Model(object):
""" """
@@ -93,7 +97,10 @@ class Model(object):
# 1. Get the model parameters # 1. Get the model parameters
params = tf.trainable_variables('ppo2_model') params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer # 2. Build our trainer
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) if MPI is not None:
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients # 3. Calculate the gradients
grads_and_var = trainer.compute_gradients(loss, params) grads_and_var = trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var) grads, var = zip(*grads_and_var)
@@ -136,10 +143,12 @@ class Model(object):
self.save = functools.partial(save_variables, sess=sess) self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess) self.load = functools.partial(load_variables, sess=sess)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
initialize() initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
sync_from_root(sess, global_variables) #pylint: disable=E1101
if MPI is not None:
sync_from_root(sess, global_variables) #pylint: disable=E1101
class Runner(AbstractEnvRunner): class Runner(AbstractEnvRunner):
""" """
@@ -392,9 +401,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
logger.logkv('time_elapsed', tnow - tfirststart) logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names): for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval) logger.logkv(lossname, lossval)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs() logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and MPI.COMM_WORLD.Get_rank() == 0: if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints') checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True) os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update) savepath = osp.join(checkdir, '%.5i'%update)

View File

@@ -4,7 +4,6 @@ import baselines.common.tf_util as U
import tensorflow as tf, numpy as np import tensorflow as tf, numpy as np
import time import time
from baselines.common import colorize from baselines.common import colorize
from mpi4py import MPI
from collections import deque from collections import deque
from baselines.common import set_global_seeds from baselines.common import set_global_seeds
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
@@ -13,6 +12,11 @@ from baselines.common.input import observation_placeholder
from baselines.common.policies import build_policy from baselines.common.policies import build_policy
from contextlib import contextmanager from contextlib import contextmanager
try:
from mpi4py import MPI
except ImportError:
MPI = None
def traj_segment_generator(pi, env, horizon, stochastic): def traj_segment_generator(pi, env, horizon, stochastic):
# Initialize state variables # Initialize state variables
t = 0 t = 0
@@ -146,9 +150,12 @@ def learn(*,
''' '''
if MPI is not None:
nworkers = MPI.COMM_WORLD.Get_size() nworkers = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank() rank = MPI.COMM_WORLD.Get_rank()
else:
nworkers = 1
rank = 0
cpus_per_worker = 1 cpus_per_worker = 1
U.get_session(config=tf.ConfigProto( U.get_session(config=tf.ConfigProto(
@@ -237,9 +244,13 @@ def learn(*,
def allmean(x): def allmean(x):
assert isinstance(x, np.ndarray) assert isinstance(x, np.ndarray)
out = np.empty_like(x) if MPI is not None:
MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM) out = np.empty_like(x)
out /= nworkers MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM)
out /= nworkers
else:
out = np.copy(x)
return out return out
U.initialize() U.initialize()
@@ -247,7 +258,9 @@ def learn(*,
pi.load(load_path) pi.load(load_path)
th_init = get_flat() th_init = get_flat()
MPI.COMM_WORLD.Bcast(th_init, root=0) if MPI is not None:
MPI.COMM_WORLD.Bcast(th_init, root=0)
set_from_flat(th_init) set_from_flat(th_init)
vfadam.sync() vfadam.sync()
print("Init param sum", th_init.sum(), flush=True) print("Init param sum", th_init.sum(), flush=True)
@@ -353,7 +366,11 @@ def learn(*,
logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret)) logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret))
lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples if MPI is not None:
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples
else:
listoflrpairs = [lrlocal]
lens, rews = map(flatten_lists, zip(*listoflrpairs)) lens, rews = map(flatten_lists, zip(*listoflrpairs))
lenbuffer.extend(lens) lenbuffer.extend(lens)
rewbuffer.extend(rews) rewbuffer.extend(rews)

View File

@@ -15,6 +15,9 @@ extras = {
], ],
'bullet': [ 'bullet': [
'pybullet', 'pybullet',
],
'mpi': [
'mpi4py'
] ]
} }
@@ -34,7 +37,6 @@ setup(name='baselines',
'joblib', 'joblib',
'dill', 'dill',
'progressbar2', 'progressbar2',
'mpi4py',
'cloudpickle', 'cloudpickle',
'click', 'click',
'opencv-python' 'opencv-python'