make baselines run without mpi wip

This commit is contained in:
Peter Zhokhov
2018-10-19 17:00:41 -07:00
parent d0cc325e14
commit d96e20ff27
6 changed files with 56 additions and 29 deletions

View File

@@ -1,16 +1,7 @@
FROM ubuntu:16.04 FROM python:3.6
RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv # RUN apt-get -y update && apt-get -y install git wget python-dev python3-dev libopenmpi-dev python-pip zlib1g-dev cmake python-opencv
ENV CODE_DIR /root/code ENV CODE_DIR /root/code
ENV VENV /root/venv
RUN \
pip install virtualenv && \
virtualenv $VENV --python=python3 && \
. $VENV/bin/activate && \
pip install --upgrade pip
ENV PATH=$VENV/bin:$PATH
COPY . $CODE_DIR/baselines COPY . $CODE_DIR/baselines
WORKDIR $CODE_DIR/baselines WORKDIR $CODE_DIR/baselines

View File

@@ -1,4 +1,8 @@
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
import tensorflow as tf, baselines.common.tf_util as U, numpy as np import tensorflow as tf, baselines.common.tf_util as U, numpy as np
class RunningMeanStd(object): class RunningMeanStd(object):
@@ -39,7 +43,8 @@ class RunningMeanStd(object):
n = int(np.prod(self.shape)) n = int(np.prod(self.shape))
totalvec = np.zeros(n*2+1, 'float64') totalvec = np.zeros(n*2+1, 'float64')
addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')]) addvec = np.concatenate([x.sum(axis=0).ravel(), np.square(x).sum(axis=0).ravel(), np.array([len(x)],dtype='float64')])
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM) if MPI is not None:
MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM)
self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n]) self.incfiltparams(totalvec[0:n].reshape(self.shape), totalvec[n:2*n].reshape(self.shape), totalvec[2*n])
@U.in_session @U.in_session

View File

@@ -9,7 +9,6 @@ from baselines import logger
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
import baselines.common.tf_util as U import baselines.common.tf_util as U
from baselines.common.mpi_running_mean_std import RunningMeanStd from baselines.common.mpi_running_mean_std import RunningMeanStd
from mpi4py import MPI
def normalize(x, stats): def normalize(x, stats):
if stats is None: if stats is None:
@@ -358,6 +357,11 @@ class DDPG(object):
return stats return stats
def adapt_param_noise(self): def adapt_param_noise(self):
try:
from mpi4py import MPI
except ImportError:
MPI = None
if self.param_noise is None: if self.param_noise is None:
return 0. return 0.
@@ -371,7 +375,11 @@ class DDPG(object):
self.param_noise_stddev: self.param_noise.current_stddev, self.param_noise_stddev: self.param_noise.current_stddev,
}) })
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size() if MPI is not None:
mean_distance = MPI.COMM_WORLD.allreduce(distance, op=MPI.SUM) / MPI.COMM_WORLD.Get_size()
else:
mean_distance = distance
self.param_noise.adapt(mean_distance) self.param_noise.adapt(mean_distance)
return mean_distance return mean_distance

View File

@@ -12,7 +12,11 @@ from baselines.common.runners import AbstractEnvRunner
from baselines.common.tf_util import get_session, save_variables, load_variables from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
from mpi4py import MPI try:
from mpi4py import MPI
except ImportError:
MPI = None
from baselines.common.tf_util import initialize from baselines.common.tf_util import initialize
from baselines.common.mpi_util import sync_from_root from baselines.common.mpi_util import sync_from_root
@@ -93,7 +97,10 @@ class Model(object):
# 1. Get the model parameters # 1. Get the model parameters
params = tf.trainable_variables('ppo2_model') params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer # 2. Build our trainer
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5) if MPI is not None:
trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients # 3. Calculate the gradients
grads_and_var = trainer.compute_gradients(loss, params) grads_and_var = trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var) grads, var = zip(*grads_and_var)
@@ -136,7 +143,7 @@ class Model(object):
self.save = functools.partial(save_variables, sess=sess) self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess) self.load = functools.partial(load_variables, sess=sess)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
initialize() initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="") global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
sync_from_root(sess, global_variables) #pylint: disable=E1101 sync_from_root(sess, global_variables) #pylint: disable=E1101
@@ -392,9 +399,9 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
logger.logkv('time_elapsed', tnow - tfirststart) logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names): for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval) logger.logkv(lossname, lossval)
if MPI.COMM_WORLD.Get_rank() == 0: if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs() logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and MPI.COMM_WORLD.Get_rank() == 0: if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints') checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True) os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update) savepath = osp.join(checkdir, '%.5i'%update)

View File

@@ -4,7 +4,6 @@ import baselines.common.tf_util as U
import tensorflow as tf, numpy as np import tensorflow as tf, numpy as np
import time import time
from baselines.common import colorize from baselines.common import colorize
from mpi4py import MPI
from collections import deque from collections import deque
from baselines.common import set_global_seeds from baselines.common import set_global_seeds
from baselines.common.mpi_adam import MpiAdam from baselines.common.mpi_adam import MpiAdam
@@ -13,6 +12,11 @@ from baselines.common.input import observation_placeholder
from baselines.common.policies import build_policy from baselines.common.policies import build_policy
from contextlib import contextmanager from contextlib import contextmanager
try:
from mpi4py import MPI
except ImportError:
MPI = None
def traj_segment_generator(pi, env, horizon, stochastic): def traj_segment_generator(pi, env, horizon, stochastic):
# Initialize state variables # Initialize state variables
t = 0 t = 0
@@ -146,9 +150,12 @@ def learn(*,
''' '''
if MPI is not None:
nworkers = MPI.COMM_WORLD.Get_size() nworkers = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank() rank = MPI.COMM_WORLD.Get_rank()
else:
nworkers = 1
rank = 0
cpus_per_worker = 1 cpus_per_worker = 1
U.get_session(config=tf.ConfigProto( U.get_session(config=tf.ConfigProto(
@@ -238,8 +245,9 @@ def learn(*,
def allmean(x): def allmean(x):
assert isinstance(x, np.ndarray) assert isinstance(x, np.ndarray)
out = np.empty_like(x) out = np.empty_like(x)
MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM) if MPI is not None:
out /= nworkers MPI.COMM_WORLD.Allreduce(x, out, op=MPI.SUM)
out /= nworkers
return out return out
U.initialize() U.initialize()
@@ -247,7 +255,9 @@ def learn(*,
pi.load(load_path) pi.load(load_path)
th_init = get_flat() th_init = get_flat()
MPI.COMM_WORLD.Bcast(th_init, root=0) if MPI is not None:
MPI.COMM_WORLD.Bcast(th_init, root=0)
set_from_flat(th_init) set_from_flat(th_init)
vfadam.sync() vfadam.sync()
print("Init param sum", th_init.sum(), flush=True) print("Init param sum", th_init.sum(), flush=True)
@@ -353,7 +363,11 @@ def learn(*,
logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret)) logger.record_tabular("ev_tdlam_before", explained_variance(vpredbefore, tdlamret))
lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values lrlocal = (seg["ep_lens"], seg["ep_rets"]) # local values
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples if MPI is not None:
listoflrpairs = MPI.COMM_WORLD.allgather(lrlocal) # list of tuples
else
listoflrpairs = [lrlocal]
lens, rews = map(flatten_lists, zip(*listoflrpairs)) lens, rews = map(flatten_lists, zip(*listoflrpairs))
lenbuffer.extend(lens) lenbuffer.extend(lens)
rewbuffer.extend(rews) rewbuffer.extend(rews)

View File

@@ -15,6 +15,9 @@ extras = {
], ],
'bullet': [ 'bullet': [
'pybullet', 'pybullet',
],
'mpi': [
'mpi4py'
] ]
} }
@@ -34,7 +37,6 @@ setup(name='baselines',
'joblib', 'joblib',
'dill', 'dill',
'progressbar2', 'progressbar2',
'mpi4py',
'cloudpickle', 'cloudpickle',
'click', 'click',
'opencv-python' 'opencv-python'