Compare commits

...

13 Commits

Author SHA1 Message Date
Peter Zhokhov
58801032fc install mpi4py in mpi dockerfile 2018-10-31 11:34:10 -07:00
Peter Zhokhov
b4a149a75f fix .travis.yml 2018-10-31 11:32:03 -07:00
Peter Zhokhov
c248bf9a46 CI dockerfiles with and without mpi 2018-10-31 11:27:45 -07:00
Peter Zhokhov
d1f7d12743 mpiless ddpg 2018-10-31 09:48:41 -07:00
Peter Zhokhov
f0d49fb67d add assertion to test in mpi_adam; fix trpo_mpi failure without MPI on cartpole 2018-10-30 14:45:20 -07:00
Peter Zhokhov
ef2e7246c9 autopep8 2018-10-30 14:11:38 -07:00
Peter Zhokhov
3e3e2b7998 MpiAdam becomes regular Adam if Mpi not present 2018-10-30 14:04:30 -07:00
Peter Zhokhov
d00f3bce34 syntax and flake8 2018-10-30 09:47:39 -07:00
Peter Zhokhov
72aa2f1251 more MPI removal 2018-10-29 15:43:56 -07:00
Peter Zhokhov
ea7a52b652 further removing MPI references where unnecessary 2018-10-29 15:38:16 -07:00
Peter Zhokhov
064c45fa76 Merge branch 'master' of github.com:openai/baselines into peterz_mpiless 2018-10-29 15:31:37 -07:00
Peter Zhokhov
6f148fdb0d squash-merged latest master 2018-10-29 15:28:59 -07:00
Peter Zhokhov
d96e20ff27 make baselines run without mpi wip 2018-10-19 17:00:41 -07:00
11 changed files with 162 additions and 63 deletions

View File

@@ -5,10 +5,14 @@ python:
services: services:
- docker - docker
env:
- DOCKER_SUFFIX=py36-nompi
- DOCKER_SUFFIX=py36-mpi
install: install:
- pip install flake8 - pip install flake8
- docker build . -t baselines-test - docker build -f test.dockerfile.${DOCKER_SUFFIX} -t baselines-test .
script: script:
- flake8 . --show-source --statistics - flake8 . --show-source --statistics
- docker run baselines-test pytest -v . - docker run baselines-test pytest -v .

View File

@@ -1,25 +0,0 @@
FROM ubuntu:16.04
RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv
ENV CODE_DIR /root/code
ENV VENV /root/venv
RUN \
pip install virtualenv && \
virtualenv $VENV --python=python3 && \
. $VENV/bin/activate && \
pip install --upgrade pip
ENV PATH=$VENV/bin:$PATH
COPY . $CODE_DIR/baselines
WORKDIR $CODE_DIR/baselines
# Clean up pycache and pyc files
RUN rm -rf __pycache__ && \
find . -name "*.pyc" -delete && \
pip install tensorflow && \
pip install -e .[test]
CMD /bin/bash

View File

@@ -1,7 +1,11 @@
from mpi4py import MPI
import baselines.common.tf_util as U import baselines.common.tf_util as U
import tensorflow as tf import tensorflow as tf
import numpy as np import numpy as np
try:
from mpi4py import MPI
except ImportError:
MPI = None
class MpiAdam(object): class MpiAdam(object):
def __init__(self, var_list, *, beta1=0.9, beta2=0.999, epsilon=1e-08, scale_grad_by_procs=True, comm=None): def __init__(self, var_list, *, beta1=0.9, beta2=0.999, epsilon=1e-08, scale_grad_by_procs=True, comm=None):
@@ -16,16 +20,19 @@ class MpiAdam(object):
self.t = 0 self.t = 0
self.setfromflat = U.SetFromFlat(var_list) self.setfromflat = U.SetFromFlat(var_list)
self.getflat = U.GetFlat(var_list) self.getflat = U.GetFlat(var_list)
self.comm = MPI.COMM_WORLD if comm is None else comm self.comm = MPI.COMM_WORLD if comm is None and MPI is not None else comm
def update(self, localg, stepsize): def update(self, localg, stepsize):
if self.t % 100 == 0: if self.t % 100 == 0:
self.check_synced() self.check_synced()
localg = localg.astype('float32') localg = localg.astype('float32')
globalg = np.zeros_like(localg) if self.comm is not None:
self.comm.Allreduce(localg, globalg, op=MPI.SUM) globalg = np.zeros_like(localg)
if self.scale_grad_by_procs: self.comm.Allreduce(localg, globalg, op=MPI.SUM)
globalg /= self.comm.Get_size() if self.scale_grad_by_procs:
globalg /= self.comm.Get_size()
else:
globalg = np.copy(localg)
self.t += 1 self.t += 1
a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t) a = stepsize * np.sqrt(1 - self.beta2**self.t)/(1 - self.beta1**self.t)
@@ -35,11 +42,15 @@ class MpiAdam(object):
self.setfromflat(self.getflat() + step) self.setfromflat(self.getflat() + step)
def sync(self): def sync(self):
if self.comm is None:
return
theta = self.getflat() theta = self.getflat()
self.comm.Bcast(theta, root=0) self.comm.Bcast(theta, root=0)
self.setfromflat(theta) self.setfromflat(theta)
def check_synced(self): def check_synced(self):
if self.comm is None:
return
if self.comm.Get_rank() == 0: # this is root if self.comm.Get_rank() == 0: # this is root
theta = self.getflat() theta = self.getflat()
self.comm.Bcast(theta, root=0) self.comm.Bcast(theta, root=0)
@@ -63,17 +74,30 @@ def test_MpiAdam():
do_update = U.function([], loss, updates=[update_op]) do_update = U.function([], loss, updates=[update_op])
tf.get_default_session().run(tf.global_variables_initializer()) tf.get_default_session().run(tf.global_variables_initializer())
losslist_ref = []
for i in range(10): for i in range(10):
print(i,do_update()) l = do_update()
print(i, l)
losslist_ref.append(l)
tf.set_random_seed(0) tf.set_random_seed(0)
tf.get_default_session().run(tf.global_variables_initializer()) tf.get_default_session().run(tf.global_variables_initializer())
var_list = [a,b] var_list = [a,b]
lossandgrad = U.function([], [loss, U.flatgrad(loss, var_list)], updates=[update_op]) lossandgrad = U.function([], [loss, U.flatgrad(loss, var_list)])
adam = MpiAdam(var_list) adam = MpiAdam(var_list)
losslist_test = []
for i in range(10): for i in range(10):
l,g = lossandgrad() l,g = lossandgrad()
adam.update(g, stepsize) adam.update(g, stepsize)
print(i,l) print(i,l)
losslist_test.append(l)
np.testing.assert_allclose(np.array(losslist_ref), np.array(losslist_test), atol=1e-4)
if __name__ == '__main__':
test_MpiAdam()

View File

@@ -1,4 +1,8 @@
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
import tensorflow as tf, baselines.common.tf_util as U, numpy as np import tensorflow as tf, baselines.common.tf_util as U, numpy as np
class RunningMeanStd(object): class RunningMeanStd(object):
@@ -39,7 +43,8 @@ class RunningMeanStd(object):
n = int(np.prod(self.shape)) n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64') totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')]) addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM) if MPI is not None:
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n]) self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])
@U.in_session @U.in_session

View File

@@ -12,8 +12,11 @@ import baselines.common.tf_util as U
from baselines import logger from baselines import logger
import numpy as np import numpy as np
from mpi4py import MPI
try:
from mpi4py import MPI
except ImportError:
MPI = None
def learn(network, env, def learn(network, env,
seed=None, seed=None,
@@ -49,7 +52,11 @@ def learn(network, env,
else: else:
nb_epochs = 500 nb_epochs = 500
rank = MPI.COMM_WORLD.Get_rank() if MPI is not None:
rank = MPI.COMM_WORLD.Get_rank()
else:
rank = 0
nb_actions = env.action_space.shape[-1] nb_actions = env.action_space.shape[-1]
assert (np.abs(env.action_space.low) == env.action_space.high).all() # we assume symmetric actions. assert (np.abs(env.action_space.low) == env.action_space.high).all() # we assume symmetric actions.
@@ -200,7 +207,11 @@ def learn(network, env,
eval_episode_rewards_history.append(eval_episode_reward[d]) eval_episode_rewards_history.append(eval_episode_reward[d])
eval_episode_reward[d] = 0.0 eval_episode_reward[d] = 0.0
mpi_size = MPI.COMM_WORLD.Get_size() if MPI is not None:
mpi_size = MPI.COMM_WORLD.Get_size()
else:
mpi_size = 1
# Log stats. # Log stats.
# XXX shouldn't call np.mean on variable length lists # XXX shouldn't call np.mean on variable length lists
duration = time.time() - start_time duration = time.time() - start_time
@@ -234,7 +245,10 @@ def learn(network, env,
else: else:
raise ValueError('expected scalar, got %s'%x) raise ValueError('expected scalar, got %s'%x)
combined_stats_sums = MPI.COMM_WORLD.allreduce(np.array([ np.array(x).flatten()[0] for x in combined_stats.values()])) combined_stats_sums = np.array([ np.array(x).flatten()[0] for x in combined_stats.values()])
if MPI is not None:
combined_stats_sums = MPI.COMM_WORLD.allreduce(combined_stats_sums)
combined_stats = {k : v / mpi_size for (k,v) in zip(combined_stats.keys(), combined_stats_sums)} combined_stats = {k : v / mpi_size for (k,v) in zip(combined_stats.keys(), combined_stats_sums)}
# Total statistics. # Total statistics.

View File

@@ -9,7 +9,10 @@ from baselines import logger
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
import baselines.common.tf_util as U import baselines.common.tf_util as U
from baselines.common.mpi_running_mean_std import RunningMeanStd from baselines.common.mpi_running_mean_std import RunningMeanStd
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
def normalize(x, stats): def normalize(x, stats):
if stats is None: if stats is None:
@@ -358,6 +361,11 @@ class DDPG(object):
return stats return stats
def adapt_param_noise(self): def adapt_param_noise(self):
try:
from mpi4py import MPI
except ImportError:
MPI = None
if self.param_noise is None: if self.param_noise is None:
return 0. return 0.
@@ -371,7 +379,16 @@ class DDPG(object):
self.param_noise_stddev: self.param_noise.current_stddev, self.param_noise_stddev: self.param_noise.current_stddev,
}) })
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size() if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
self.param_noise.adapt(mean_distance) self.param_noise.adapt(mean_distance)
return mean_distance return mean_distance

View File

@@ -10,11 +10,15 @@ from baselines.common import explained_variance, set_global_seeds
from baselines.common.policies import build_policy from baselines.common.policies import build_policy
from baselines.common.runners import AbstractEnvRunner from baselines.common.runners import AbstractEnvRunner
from baselines.common.tf_util import get_session, save_variables, load_variables from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
from mpi4py import MPI try:
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
from mpi4py import MPI
from baselines.common.mpi_util import sync_from_root
except ImportError:
MPI = None
from baselines.common.tf_util import initialize from baselines.common.tf_util import initialize
from baselines.common.mpi_util import sync_from_root
class Model(object): class Model(object):
""" """
@@ -93,7 +97,10 @@ class Model(object):
# 1. Get the model parameters # 1. Get the model parameters
params = tf.trainable_variables('ppo2_model') params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer # 2. Build our trainer
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) if MPI is not None:
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients # 3. Calculate the gradients
grads_and_var = trainer.compute_gradients(loss, params) grads_and_var = trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var) grads, var = zip(*grads_and_var)
@@ -136,10 +143,12 @@ class Model(object):
self.save = functools.partial(save_variables, sess=sess) self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess) self.load = functools.partial(load_variables, sess=sess)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
initialize() initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
sync_from_root(sess, global_variables) #pylint: disable=E1101
if MPI is not None:
sync_from_root(sess, global_variables) #pylint: disable=E1101
class Runner(AbstractEnvRunner): class Runner(AbstractEnvRunner):
""" """
@@ -392,9 +401,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
logger.logkv('time_elapsed', tnow - tfirststart) logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names): for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval) logger.logkv(lossname, lossval)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs() logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and MPI.COMM_WORLD.Get_rank() == 0: if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints') checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True) os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update) savepath = osp.join(checkdir, '%.5i'%update)

View File

@@ -4,7 +4,6 @@ import baselines.common.tf_util as U
import tensorflow as tf, numpy as np import tensorflow as tf, numpy as np
import time import time
from baselines.common import colorize from baselines.common import colorize
from mpi4py import MPI
from collections import deque from collections import deque
from baselines.common import set_global_seeds from baselines.common import set_global_seeds
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
@@ -13,6 +12,11 @@ from baselines.common.input import observation_placeholder
from baselines.common.policies import build_policy from baselines.common.policies import build_policy
from contextlib import contextmanager from contextlib import contextmanager
try:
from mpi4py import MPI
except ImportError:
MPI = None
def traj_segment_generator(pi, env, horizon, stochastic): def traj_segment_generator(pi, env, horizon, stochastic):
# Initialize state variables # Initialize state variables
t = 0 t = 0
@@ -146,9 +150,12 @@ def learn(*,
''' '''
if MPI is not None:
nworkers = MPI.COMM_WORLD.Get_size() nworkers = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank() rank = MPI.COMM_WORLD.Get_rank()
else:
nworkers = 1
rank = 0
cpus_per_worker = 1 cpus_per_worker = 1
U.get_session(config=tf.ConfigProto( U.get_session(config=tf.ConfigProto(
@@ -237,9 +244,13 @@ def learn(*,
def allmean(x): def allmean(x):
assert isinstance(x, np.ndarray) assert isinstance(x, np.ndarray)
out = np.empty_like(x) if MPI is not None:
MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM) out = np.empty_like(x)
out /= nworkers MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM)
out /= nworkers
else:
out = np.copy(x)
return out return out
U.initialize() U.initialize()
@@ -247,7 +258,9 @@ def learn(*,
pi.load(load_path) pi.load(load_path)
th_init = get_flat() th_init = get_flat()
MPI.COMM_WORLD.Bcast(th_init, root=0) if MPI is not None:
MPI.COMM_WORLD.Bcast(th_init, root=0)
set_from_flat(th_init) set_from_flat(th_init)
vfadam.sync() vfadam.sync()
print("Init param sum", th_init.sum(), flush=True) print("Init param sum", th_init.sum(), flush=True)
@@ -353,7 +366,11 @@ def learn(*,
logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret)) logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret))
lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples if MPI is not None:
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples
else:
listoflrpairs = [lrlocal]
lens, rews = map(flatten_lists, zip(*listoflrpairs)) lens, rews = map(flatten_lists, zip(*listoflrpairs))
lenbuffer.extend(lens) lenbuffer.extend(lens)
rewbuffer.extend(rews) rewbuffer.extend(rews)

View File

@@ -15,6 +15,9 @@ extras = {
], ],
'bullet': [ 'bullet': [
'pybullet', 'pybullet',
],
'mpi': [
'mpi4py'
] ]
} }
@@ -34,7 +37,6 @@ setup(name='baselines',
'joblib', 'joblib',
'dill', 'dill',
'progressbar2', 'progressbar2',
'mpi4py',
'cloudpickle', 'cloudpickle',
'click', 'click',
'opencv-python' 'opencv-python'

16
test.dockerfile.py36-mpi Normal file
View File

@@ -0,0 +1,16 @@
FROM python:3.6
RUN apt-get -y update && apt-get -y install ffmpeg libopenmpi-dev
ENV CODE_DIR /root/code
COPY . $CODE_DIR/baselines
WORKDIR $CODE_DIR/baselines
# Clean up pycache and pyc files
RUN rm -rf __pycache__ && \
find . -name "*.pyc" -delete && \
pip install tensorflow && \
pip install -e .[test,mpi]
CMD /bin/bash

View File

@@ -0,0 +1,16 @@
FROM python:3.6
RUN apt-get -y update && apt-get -y install ffmpeg
ENV CODE_DIR /root/code
COPY . $CODE_DIR/baselines
WORKDIR $CODE_DIR/baselines
# Clean up pycache and pyc files
RUN rm -rf __pycache__ && \
find . -name "*.pyc" -delete && \
pip install tensorflow && \
pip install -e .[test]
CMD /bin/bash