Compare commits

..

1 Commits

Author SHA1 Message Date
Greg Brockman
6bbc4635e6 Update cmd_util with initializer, env_kwargs, and force_dummy 2019-03-18 17:53:42 -07:00
65 changed files with 621 additions and 1258 deletions

View File

@@ -11,4 +11,4 @@ install:
script:
- flake8 . --show-source --statistics
- docker run -e RUNSLOW=1 baselines-test pytest -v .
- docker run baselines-test pytest -v --forked .

View File

@@ -89,7 +89,7 @@ python -m baselines.run --alg=ppo2 --env=Humanoid-v2 --network=mlp --num_timeste
will set entropy coefficient to 0.1, and construct fully connected network with 3 layers with 32 hidden units in each, and create a separate network for value function estimation (so that its parameters are not shared with the policy network, but the structure is the same)
See docstrings in [common/models.py](baselines/common/models.py) for description of network parameters for each type of model, and
docstring for [baselines/ppo2/ppo2.py/learn()](baselines/ppo2/ppo2.py#L152) for the description of the ppo2 hyperparameters.
docstring for [baselines/ppo2/ppo2.py/learn()](baselines/ppo2/ppo2.py#L152) for the description of the ppo2 hyperparamters.
### Example 2. DQN on Atari
DQN with Atari is at this point a classics of benchmarks. To run the baselines implementation of DQN on Atari Pong:
@@ -109,7 +109,7 @@ This should get to the mean reward per episode about 20. To load and visualize t
python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4 --num_timesteps=0 --load_path=~/models/pong_20M_ppo2 --play
```
*NOTE:* Mujoco environments require normalization to work properly, so we wrap them with VecNormalize wrapper. Currently, to ensure the models are saved with normalization (so that trained models can be restored and run without further training) the normalization coefficients are saved as tensorflow variables. This can decrease the performance somewhat, so if you require high-throughput steps with Mujoco and do not need saving/restoring the models, it may make sense to use numpy normalization instead. To do that, set 'use_tf=False` in [baselines/run.py](baselines/run.py#L116).
*NOTE:* At the moment Mujoco training uses VecNormalize wrapper for the environment which is not being saved correctly; so loading the models trained on Mujoco will not work well if the environment is recreated. If necessary, you can work around that by replacing RunningMeanStd by TfRunningMeanStd in [baselines/common/vec_env/vec_normalize.py](baselines/common/vec_env/vec_normalize.py#L12). This way, mean and std of environment normalizing wrapper will be saved in tensorflow variables and included in the model file; however, training is slower that way - hence not including it by default
## Loading and vizualizing learning curves and other training metrics
See [here](docs/viz/viz.ipynb) for instructions on how to load and display the training data.

View File

@@ -11,8 +11,6 @@ from baselines.common.policies import build_policy
from baselines.a2c.utils import Scheduler, find_trainable_variables
from baselines.a2c.runner import Runner
from baselines.ppo2.ppo2 import safemean
from collections import deque
from tensorflow import losses
@@ -197,7 +195,6 @@ def learn(
# Instantiate the runner object
runner = Runner(env, model, nsteps=nsteps, gamma=gamma)
epinfobuf = deque(maxlen=100)
# Calculate the batch_size
nbatch = nenvs*nsteps
@@ -207,8 +204,7 @@ def learn(
for update in range(1, total_timesteps//nbatch+1):
# Get mini batch of experiences
obs, states, rewards, masks, actions, values, epinfos = runner.run()
epinfobuf.extend(epinfos)
obs, states, rewards, masks, actions, values = runner.run()
policy_loss, value_loss, policy_entropy = model.train(obs, states, rewards, masks, actions, values)
nseconds = time.time()-tstart
@@ -225,8 +221,6 @@ def learn(
logger.record_tabular("policy_entropy", float(policy_entropy))
logger.record_tabular("value_loss", float(value_loss))
logger.record_tabular("explained_variance", float(ev))
logger.record_tabular("eprewmean", safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.record_tabular("eplenmean", safemean([epinfo['l'] for epinfo in epinfobuf]))
logger.dump_tabular()
return model

View File

@@ -22,7 +22,6 @@ class Runner(AbstractEnvRunner):
# We initialize the lists that will contain the mb of experiences
mb_obs, mb_rewards, mb_actions, mb_values, mb_dones = [],[],[],[],[]
mb_states = self.states
epinfos = []
for n in range(self.nsteps):
# Given observations, take action and value (V(s))
# We already have self.obs because Runner superclass run self.obs[:] = env.reset() on init
@@ -35,10 +34,7 @@ class Runner(AbstractEnvRunner):
mb_dones.append(self.dones)
# Take actions in env and look the results
obs, rewards, dones, infos = self.env.step(actions)
for info in infos:
maybeepinfo = info.get('episode')
if maybeepinfo: epinfos.append(maybeepinfo)
obs, rewards, dones, _ = self.env.step(actions)
self.states = states
self.dones = dones
self.obs = obs
@@ -73,4 +69,4 @@ class Runner(AbstractEnvRunner):
mb_rewards = mb_rewards.flatten()
mb_values = mb_values.flatten()
mb_masks = mb_masks.flatten()
return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values, epinfos
return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values

View File

@@ -11,8 +11,6 @@ from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.a2c.runner import Runner
from baselines.a2c.utils import Scheduler, find_trainable_variables
from baselines.acktr import kfac
from baselines.ppo2.ppo2 import safemean
from collections import deque
class Model(object):
@@ -92,7 +90,7 @@ class Model(object):
self.initial_state = step_model.initial_state
tf.global_variables_initializer().run(session=sess)
def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=100, nprocs=32, nsteps=20,
def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interval=1, nprocs=32, nsteps=20,
ent_coef=0.01, vf_coef=0.5, vf_fisher_coef=1.0, lr=0.25, max_grad_norm=0.5,
kfac_clip=0.001, save_interval=None, lrschedule='linear', load_path=None, is_async=True, **network_kwargs):
set_global_seeds(seed)
@@ -120,7 +118,6 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
model.load(load_path)
runner = Runner(env, model, nsteps=nsteps, gamma=gamma)
epinfobuf = deque(maxlen=100)
nbatch = nenvs*nsteps
tstart = time.time()
coord = tf.train.Coordinator()
@@ -130,8 +127,7 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
enqueue_threads = []
for update in range(1, total_timesteps//nbatch+1):
obs, states, rewards, masks, actions, values, epinfos = runner.run()
epinfobuf.extend(epinfos)
obs, states, rewards, masks, actions, values = runner.run()
policy_loss, value_loss, policy_entropy = model.train(obs, states, rewards, masks, actions, values)
model.old_obs = obs
nseconds = time.time()-tstart
@@ -145,8 +141,6 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
logger.record_tabular("policy_loss", float(policy_loss))
logger.record_tabular("value_loss", float(value_loss))
logger.record_tabular("explained_variance", float(ev))
logger.record_tabular("eprewmean", safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.record_tabular("eplenmean", safemean([epinfo['l'] for epinfo in epinfobuf]))
logger.dump_tabular()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir():

View File

@@ -11,7 +11,7 @@ KFAC_DEBUG = False
class KfacOptimizer():
# note that KfacOptimizer will be truly synchronous (and thus deterministic) only if a single-threaded session is used
def __init__(self, learning_rate=0.01, momentum=0.9, clip_kl=0.01, kfac_update=2, stats_accum_iter=60, full_stats_init=False, cold_iter=100, cold_lr=None, is_async=False, async_stats=False, epsilon=1e-2, stats_decay=0.95, blockdiag_bias=False, channel_fac=False, factored_damping=False, approxT2=False, use_float64=False, weight_decay_dict={},max_grad_norm=0.5):
self.max_grad_norm = max_grad_norm
self._lr = learning_rate

View File

@@ -20,7 +20,7 @@ def register_benchmark(benchmark):
if 'tasks' in benchmark:
for t in benchmark['tasks']:
if 'desc' not in t:
t['desc'] = remove_version_re.sub('', t.get('env_id', t.get('id')))
t['desc'] = remove_version_re.sub('', t['env_id'])
_BENCHMARKS.append(benchmark)

View File

@@ -16,13 +16,11 @@ class Monitor(Wrapper):
def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
Wrapper.__init__(self, env=env)
self.tstart = time.time()
if filename:
self.results_writer = ResultsWriter(filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
else:
self.results_writer = None
self.results_writer = ResultsWriter(
filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
self.reset_keywords = reset_keywords
self.info_keywords = info_keywords
self.allow_early_resets = allow_early_resets
@@ -70,9 +68,8 @@ class Monitor(Wrapper):
self.episode_lengths.append(eplen)
self.episode_times.append(time.time() - self.tstart)
epinfo.update(self.current_reset_info)
if self.results_writer:
self.results_writer.write_row(epinfo)
assert isinstance(info, dict)
self.results_writer.write_row(epinfo)
if isinstance(info, dict):
info['episode'] = epinfo
@@ -99,21 +96,24 @@ class LoadMonitorResultsError(Exception):
class ResultsWriter(object):
def __init__(self, filename, header='', extra_keys=()):
def __init__(self, filename=None, header='', extra_keys=()):
self.extra_keys = extra_keys
assert filename is not None
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()
if filename is None:
self.f = None
self.logger = None
else:
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()
def write_row(self, epinfo):
if self.logger:
@@ -121,6 +121,7 @@ class ResultsWriter(object):
self.f.flush()
def get_monitor_files(dir):
return glob(osp.join(dir, "*" + Monitor.EXT))

View File

@@ -6,8 +6,6 @@ import gym
from gym import spaces
import cv2
cv2.ocl.setUseOpenCL(False)
from .wrappers import TimeLimit
class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
@@ -130,60 +128,27 @@ class ClipRewardEnv(gym.RewardWrapper):
"""Bin reward to {+1, 0, -1} by its sign."""
return np.sign(reward)
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env, width=84, height=84, grayscale=True, dict_space_key=None):
"""
Warp frames to 84x84 as done in the Nature paper and later work.
If the environment uses dictionary observations, `dict_space_key` can be specified which indicates which
observation should be warped.
"""
super().__init__(env)
self._width = width
self._height = height
self._grayscale = grayscale
self._key = dict_space_key
if self._grayscale:
num_colors = 1
def __init__(self, env, width=84, height=84, grayscale=True):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = width
self.height = height
self.grayscale = grayscale
if self.grayscale:
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1), dtype=np.uint8)
else:
num_colors = 3
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 3), dtype=np.uint8)
new_space = gym.spaces.Box(
low=0,
high=255,
shape=(self._height, self._width, num_colors),
dtype=np.uint8,
)
if self._key is None:
original_space = self.observation_space
self.observation_space = new_space
else:
original_space = self.observation_space.spaces[self._key]
self.observation_space.spaces[self._key] = new_space
assert original_space.dtype == np.uint8 and len(original_space.shape) == 3
def observation(self, obs):
if self._key is None:
frame = obs
else:
frame = obs[self._key]
if self._grayscale:
def observation(self, frame):
if self.grayscale:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(
frame, (self._width, self._height), interpolation=cv2.INTER_AREA
)
if self._grayscale:
frame = cv2.resize(frame, (self.width, self.height), interpolation=cv2.INTER_AREA)
if self.grayscale:
frame = np.expand_dims(frame, -1)
if self._key is None:
obs = frame
else:
obs = obs.copy()
obs[self._key] = frame
return obs
return frame
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
@@ -254,15 +219,16 @@ class LazyFrames(object):
return len(self._force())
def __getitem__(self, i):
return self._force()[..., i]
return self._force()[i]
def make_atari(env_id, max_episode_steps=None):
def make_atari(env_id, timelimit=True):
# XXX(john): remove timelimit argument after gym is upgraded to allow double wrapping
env = gym.make(env_id)
if not timelimit:
env = env.env
assert 'NoFrameskip' in env.spec.id
env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
if max_episode_steps is not None:
env = TimeLimit(env, max_episode_steps=max_episode_steps)
return env
def wrap_deepmind(env, episode_life=True, clip_rewards=True, frame_stack=False, scale=False):

View File

@@ -17,22 +17,20 @@ from baselines.common.atari_wrappers import make_atari, wrap_deepmind
from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.common import retro_wrappers
from baselines.common.wrappers import ClipActionsWrapper
def make_vec_env(env_id, env_type, num_env, seed,
wrapper_kwargs=None,
env_kwargs=None,
start_index=0,
reward_scale=1.0,
flatten_dict_observations=True,
gamestate=None,
initializer=None,
env_kwargs=None,
force_dummy=False):
"""
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
"""
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir()
@@ -47,9 +45,9 @@ def make_vec_env(env_id, env_type, num_env, seed,
gamestate=gamestate,
flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs,
env_kwargs=env_kwargs,
logger_dir=logger_dir,
initializer=initializer
initializer=initializer,
env_kwargs=env_kwargs,
)
set_global_seeds(seed)
@@ -59,18 +57,11 @@ def make_vec_env(env_id, env_type, num_env, seed,
return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None):
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, logger_dir=None, initializer=None, env_kwargs=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)
if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
@@ -78,7 +69,7 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs)
env = gym.make(env_id, **(env_kwargs or {}))
if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
keys = env.observation_space.spaces.keys()
@@ -89,17 +80,11 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True)
if env_type == 'atari':
env = wrap_deepmind(env, **wrapper_kwargs)
elif env_type == 'retro':
if 'frame_stack' not in wrapper_kwargs:
wrapper_kwargs['frame_stack'] = 1
env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs)
if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env)
if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale)
@@ -171,6 +156,7 @@ def common_arg_parser():
parser.add_argument('--save_video_interval', help='Save video every x steps (0 = disabled)', default=0, type=int)
parser.add_argument('--save_video_length', help='Length of recorded video. Default: 200', default=200, type=int)
parser.add_argument('--play', default=False, action='store_true')
parser.add_argument('--extra_import', help='Extra module to import to access external environments', type=str, default=None)
return parser
def robotics_arg_parser():

View File

@@ -206,8 +206,7 @@ class CategoricalPd(Pd):
class MultiCategoricalPd(Pd):
def __init__(self, nvec, flat):
self.flat = flat
self.categoricals = list(map(CategoricalPd,
tf.split(flat, np.array(nvec, dtype=np.int32), axis=-1)))
self.categoricals = list(map(CategoricalPd, tf.split(flat, nvec, axis=-1)))
def flatparam(self):
return self.flat
def mode(self):
@@ -275,133 +274,6 @@ class BernoulliPd(Pd):
def fromflat(cls, flat):
return cls(flat)
def _np_cast(x, dtype):
"""Numpy cast, equivalent to tf.cast"""
return x.astype(dtype)
def decode_tuple_sample(pdtypes, x):
"""
Cast and convert a sample from its dense concatenated state back to constituent parts.
Arguments
---------
:param pdtypes: list<PdType>, a TuplePdType's child PdTypes.
:param x: np.ndarray or tf.Tensor.
Shape is [..., sum(pdtype.sample_shape for pdtype in pdtypes)]
:return output, list<np.ndarray> or list<tf.Tensor>, the split and correctly casted
policy samples.
"""
if isinstance(x, np.ndarray):
cast_fn = _np_cast
numpy_casting = True
else:
cast_fn = tf.cast
numpy_casting = False
so_far = 0
xs = []
for pdtype in pdtypes:
sample_size = pdtype.sample_shape()[0] if len(pdtype.sample_shape()) > 0 else 1
if len(pdtype.sample_shape()) == 0:
slided_x = x[..., so_far]
else:
slided_x = x[..., so_far:so_far + sample_size]
desired_dtype = pdtype.sample_dtype()
if numpy_casting:
desired_dtype = desired_dtype.as_numpy_dtype
if desired_dtype != x:
slided_x = cast_fn(slided_x, desired_dtype)
xs.append(slided_x)
so_far += sample_size
return xs
class TuplePd(Pd):
def __init__(self, sample_dtype, pdtypes, logits):
self.pdtypes = pdtypes
self.sample_dtype = sample_dtype
self.pds = []
so_far = 0
for pdtype in self.pdtypes:
param_shape = pdtype.param_shape()[0]
self.pds.append(pdtype.pdfromflat(logits[..., so_far:so_far + param_shape]))
so_far += param_shape
def flatparam(self):
return tf.concat([pd.flatparam() for pd in self.pds], axis=-1)
def mode(self):
return self.tuple_sample_concat([pd.mode() for pd in self.pds])
def tuple_sample_concat(self, samples):
out = []
for sample, pdtype in zip(samples, self.pdtypes):
if len(pdtype.sample_shape()) == 0:
sample = tf.expand_dims(sample, axis=-1)
if sample.dtype != self.sample_dtype:
sample = tf.cast(sample, self.sample_dtype)
out.append(sample)
return tf.concat(out, axis=-1)
def sample(self):
return self.tuple_sample_concat([pd.sample() for pd in self.pds])
def neglogp(self, x):
return tf.add_n([pd.neglogp(xi) for pd, xi in zip(self.pds, decode_tuple_sample(self.pdtypes, x))])
def entropy(self):
return tf.add_n([pd.entropy() for pd in self.pds])
def _dtype_promotion(old, new):
"""
Find the highest precision common ground between two tensorflow datatypes.
if old is None, it is ignored.
"""
if old is None or (new.is_floating and old.is_integer):
return new
if old.is_floating and old.is_integer:
return old
if (old.is_floating and new.is_floating) or (new.is_integer and new.is_integer):
# take the largest type (e.g. float64 over float32)
return old if old.size > new.size else new
raise ValueError("No idea how to promote {} and {}.".format(old, new))
class TuplePdType(PdType):
def __init__(self, space):
self.internal_pdtypes = [make_pdtype(space) for space in space.spaces]
def decode_sample(self, x):
return decode_tuple_sample(self.internal_pdtypes, x)
def pdclass(self):
return TuplePd
def pdfromflat(self, flat):
return TuplePd(self.sample_dtype(), self.internal_pdtypes, flat)
def param_shape(self):
return [sum([pdtype.param_shape()[0]
for pdtype in self.internal_pdtypes])]
def sample_shape(self):
return [sum([pdtype.sample_shape()[0] if len(pdtype.sample_shape()) > 0 else 1
for pdtype in self.internal_pdtypes])]
def sample_dtype(self):
dtype = None
for pdtype in self.internal_pdtypes:
dtype = _dtype_promotion(dtype, pdtype.sample_dtype())
return dtype
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
@@ -413,12 +285,9 @@ def make_pdtype(ac_space):
return MultiCategoricalPdType(ac_space.nvec)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
elif isinstance(ac_space, spaces.Tuple):
return TuplePdType(ac_space)
else:
raise NotImplementedError
def shape_el(v, i):
maybe = v.get_shape()[i]
if maybe is not None:

View File

@@ -13,6 +13,27 @@ def zipsame(*seqs):
return zip(*seqs)
def unpack(seq, sizes):
"""
Unpack 'seq' into a sequence of lists, with lengths specified by 'sizes'.
None = just one bare element, not a list
Example:
unpack([1,2,3,4,5,6], [3,None,2]) -> ([1,2,3], 4, [5,6])
"""
seq = list(seq)
it = iter(seq)
assert sum(1 if s is None else s for s in sizes) == len(seq), "Trying to unpack %s into %s" % (seq, sizes)
for size in sizes:
if size is None:
yield it.__next__()
else:
li = []
for _ in range(size):
li.append(it.__next__())
yield li
class EzPickle(object):
"""Objects that are pickled and unpickled via their constructor
arguments.

View File

@@ -3,6 +3,7 @@ import tensorflow as tf
from baselines.a2c import utils
from baselines.a2c.utils import conv, fc, conv_to_fc, batch_to_seq, seq_to_batch
from baselines.common.mpi_running_mean_std import RunningMeanStd
import tensorflow.contrib.layers as layers
mapping = {}
@@ -25,51 +26,6 @@ def nature_cnn(unscaled_images, **conv_kwargs):
h3 = conv_to_fc(h3)
return activ(fc(h3, 'fc1', nh=512, init_scale=np.sqrt(2)))
def build_impala_cnn(unscaled_images, depths=[16,32,32], **conv_kwargs):
"""
Model used in the paper "IMPALA: Scalable Distributed Deep-RL with
Importance Weighted Actor-Learner Architectures" https://arxiv.org/abs/1802.01561
"""
layer_num = 0
def get_layer_num_str():
nonlocal layer_num
num_str = str(layer_num)
layer_num += 1
return num_str
def conv_layer(out, depth):
return tf.layers.conv2d(out, depth, 3, padding='same', name='layer_' + get_layer_num_str())
def residual_block(inputs):
depth = inputs.get_shape()[-1].value
out = tf.nn.relu(inputs)
out = conv_layer(out, depth)
out = tf.nn.relu(out)
out = conv_layer(out, depth)
return out + inputs
def conv_sequence(inputs, depth):
out = conv_layer(inputs, depth)
out = tf.layers.max_pooling2d(out, pool_size=3, strides=2, padding='same')
out = residual_block(out)
out = residual_block(out)
return out
out = tf.cast(unscaled_images, tf.float32) / 255.
for depth in depths:
out = conv_sequence(out, depth)
out = tf.layers.flatten(out)
out = tf.nn.relu(out)
out = tf.layers.dense(out, 256, activation=tf.nn.relu, name='layer_' + get_layer_num_str())
return out
@register("mlp")
def mlp(num_layers=2, num_hidden=64, activation=tf.tanh, layer_norm=False):
@@ -109,11 +65,6 @@ def cnn(**conv_kwargs):
return nature_cnn(X, **conv_kwargs)
return network_fn
@register("impala_cnn")
def impala_cnn(**conv_kwargs):
def network_fn(X):
return build_impala_cnn(X)
return network_fn
@register("cnn_small")
def cnn_small(**conv_kwargs):
@@ -128,6 +79,7 @@ def cnn_small(**conv_kwargs):
return h
return network_fn
@register("lstm")
def lstm(nlstm=128, layer_norm=False):
"""
@@ -184,12 +136,12 @@ def lstm(nlstm=128, layer_norm=False):
@register("cnn_lstm")
def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs):
def cnn_lstm(nlstm=128, layer_norm=False, **conv_kwargs):
def network_fn(X, nenv=1):
nbatch = X.shape[0]
nsteps = nbatch // nenv
h = conv_fn(X, **conv_kwargs)
h = nature_cnn(X, **conv_kwargs)
M = tf.placeholder(tf.float32, [nbatch]) #mask (done t-1)
S = tf.placeholder(tf.float32, [nenv, 2*nlstm]) #states
@@ -209,9 +161,6 @@ def cnn_lstm(nlstm=128, layer_norm=False, conv_fn=nature_cnn, **conv_kwargs):
return network_fn
@register("impala_cnn_lstm")
def impala_cnn_lstm():
return cnn_lstm(nlstm=256, conv_fn=build_impala_cnn)
@register("cnn_lnlstm")
def cnn_lnlstm(nlstm=128, **conv_kwargs):
@@ -238,7 +187,7 @@ def conv_only(convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], **conv_kwargs):
out = tf.cast(X, tf.float32) / 255.
with tf.variable_scope("convnet"):
for num_outputs, kernel_size, stride in convs:
out = tf.contrib.layers.convolution2d(out,
out = layers.convolution2d(out,
num_outputs=num_outputs,
kernel_size=kernel_size,
stride=stride,

View File

@@ -1,90 +1,31 @@
import numpy as np
import tensorflow as tf
from baselines.common import tf_util as U
from baselines.common.tests.test_with_mpi import with_mpi
from baselines import logger
try:
from mpi4py import MPI
except ImportError:
MPI = None
from mpi4py import MPI
class MpiAdamOptimizer(tf.train.AdamOptimizer):
"""Adam optimizer that averages gradients across mpi processes."""
def __init__(self, comm, grad_clip=None, mpi_rank_weight=1, **kwargs):
def __init__(self, comm, **kwargs):
self.comm = comm
self.grad_clip = grad_clip
self.mpi_rank_weight = mpi_rank_weight
tf.train.AdamOptimizer.__init__(self, **kwargs)
def compute_gradients(self, loss, var_list, **kwargs):
grads_and_vars = tf.train.AdamOptimizer.compute_gradients(self, loss, var_list, **kwargs)
grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None]
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0) * self.mpi_rank_weight
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0)
shapes = [v.shape.as_list() for g, v in grads_and_vars]
sizes = [int(np.prod(s)) for s in shapes]
total_weight = np.zeros(1, np.float32)
self.comm.Allreduce(np.array([self.mpi_rank_weight], dtype=np.float32), total_weight, op=MPI.SUM)
total_weight = total_weight[0]
num_tasks = self.comm.Get_size()
buf = np.zeros(sum(sizes), np.float32)
countholder = [0] # Counts how many times _collect_grads has been called
stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable
def _collect_grads(flat_grad, np_stat):
if self.grad_clip is not None:
gradnorm = np.linalg.norm(flat_grad)
if gradnorm > 1:
flat_grad /= gradnorm
logger.logkv_mean('gradnorm', gradnorm)
logger.logkv_mean('gradclipfrac', float(gradnorm > 1))
def _collect_grads(flat_grad):
self.comm.Allreduce(flat_grad, buf, op=MPI.SUM)
np.divide(buf, float(total_weight), out=buf)
if countholder[0] % 100 == 0:
check_synced(np_stat, self.comm)
countholder[0] += 1
np.divide(buf, float(num_tasks), out=buf)
return buf
avg_flat_grad = tf.py_func(_collect_grads, [flat_grad, stat], tf.float32)
avg_flat_grad = tf.py_func(_collect_grads, [flat_grad], tf.float32)
avg_flat_grad.set_shape(flat_grad.shape)
avg_grads = tf.split(avg_flat_grad, sizes, axis=0)
avg_grads_and_vars = [(tf.reshape(g, v.shape), v)
for g, (_, v) in zip(avg_grads, grads_and_vars)]
return avg_grads_and_vars
def check_synced(localval, comm=None):
"""
It's common to forget to initialize your variables to the same values, or
(less commonly) if you update them in some other way than adam, to get them out of sync.
This function checks that variables on all MPI workers are the same, and raises
an AssertionError otherwise
Arguments:
comm: MPI communicator
localval: list of local variables (list of variables on current worker to be compared with the other workers)
"""
comm = comm or MPI.COMM_WORLD
vals = comm.gather(localval)
if comm.rank == 0:
assert all(val==vals[0] for val in vals[1:]),\
f'MpiAdamOptimizer detected that different workers have different weights: {vals}'
@with_mpi(timeout=5)
def test_nonfreeze():
np.random.seed(0)
tf.set_random_seed(0)
a = tf.Variable(np.random.randn(3).astype('float32'))
b = tf.Variable(np.random.randn(2,5).astype('float32'))
loss = tf.reduce_sum(tf.square(a)) + tf.reduce_sum(tf.sin(b))
stepsize = 1e-2
# for some reason the session config with inter_op_parallelism_threads was causing
# nested sess.run calls to freeze
config = tf.ConfigProto(inter_op_parallelism_threads=1)
sess = U.get_session(config=config)
update_op = MpiAdamOptimizer(comm=MPI.COMM_WORLD, learning_rate=stepsize).minimize(loss)
sess.run(tf.global_variables_initializer())
losslist_ref = []
for i in range(100):
l,_ = sess.run([loss, update_op])
print(i, l)
losslist_ref.append(l)

View File

@@ -1,16 +1,9 @@
from collections import defaultdict
from mpi4py import MPI
import os, numpy as np
import platform
import shutil
import subprocess
import warnings
import sys
try:
from mpi4py import MPI
except ImportError:
MPI = None
def sync_from_root(sess, variables, comm=None):
"""
@@ -20,10 +13,15 @@ def sync_from_root(sess, variables, comm=None):
variables: all parameter variables including optimizer's
"""
if comm is None: comm = MPI.COMM_WORLD
import tensorflow as tf
values = comm.bcast(sess.run(variables))
sess.run([tf.assign(var, val)
for (var, val) in zip(variables, values)])
rank = comm.Get_rank()
for var in variables:
if rank == 0:
comm.Bcast(sess.run(var))
else:
import tensorflow as tf
returned_var = np.empty(var.shape, dtype='float32')
comm.Bcast(returned_var)
sess.run(tf.assign(var, returned_var))
def gpu_count():
"""
@@ -36,15 +34,13 @@ def gpu_count():
def setup_mpi_gpus():
"""
Set CUDA_VISIBLE_DEVICES to MPI rank if not already set
Set CUDA_VISIBLE_DEVICES using MPI.
"""
if 'CUDA_VISIBLE_DEVICES' not in os.environ:
if sys.platform == 'darwin': # This Assumes if you're on OSX you're just
ids = [] # doing a smoke test and don't want GPUs
else:
lrank, _lsize = get_local_rank_size(MPI.COMM_WORLD)
ids = [lrank]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, ids))
num_gpus = gpu_count()
if num_gpus == 0:
return
local_rank, _ = get_local_rank_size(MPI.COMM_WORLD)
os.environ['CUDA_VISIBLE_DEVICES'] = str(local_rank % num_gpus)
def get_local_rank_size(comm):
"""
@@ -85,9 +81,6 @@ def share_file(comm, path):
comm.Barrier()
def dict_gather(comm, d, op='mean', assert_all_have_data=True):
"""
Perform a reduction operation over dicts
"""
if comm is None: return d
alldicts = comm.allgather(d)
size = comm.size
@@ -106,28 +99,3 @@ def dict_gather(comm, d, op='mean', assert_all_have_data=True):
else:
assert 0, op
return result
def mpi_weighted_mean(comm, local_name2valcount):
"""
Perform a weighted average over dicts that are each on a different node
Input: local_name2valcount: dict mapping key -> (value, count)
Returns: key -> mean
"""
all_name2valcount = comm.gather(local_name2valcount)
if comm.rank == 0:
name2sum = defaultdict(float)
name2count = defaultdict(float)
for n2vc in all_name2valcount:
for (name, (val, count)) in n2vc.items():
try:
val = float(val)
except ValueError:
if comm.rank == 0:
warnings.warn('WARNING: tried to compute mean on non-float {}={}'.format(name, val))
else:
name2sum[name] += val * count
name2count[name] += count
return {name : name2sum[name] / name2count[name] for name in name2sum}
else:
return {}

View File

@@ -90,8 +90,6 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_
sum_y *= interstep_decay
count_y *= interstep_decay
while True:
if luoi >= len(xolds):
break
xold = xolds[luoi]
if xold <= xnew:
decay = np.exp(- (xnew - xold) / decay_period)
@@ -100,6 +98,8 @@ def one_sided_ema(xolds, yolds, low=None, high=None, n=512, decay_steps=1., low_
luoi += 1
else:
break
if luoi >= len(xolds):
break
sum_ys[i] = sum_y
count_ys[i] = count_y
@@ -249,9 +249,6 @@ def plot_results(
legend_outside=False,
resample=0,
smooth_step=1.0,
tiling='vertical',
xlabel=None,
ylabel=None
):
'''
Plot multiple Results objects
@@ -303,23 +300,9 @@ def plot_results(
sk2r[splitkey].append(result)
assert len(sk2r) > 0
assert isinstance(resample, int), "0: don't resample. <integer>: that many samples"
if tiling == 'vertical' or tiling is None:
nrows = len(sk2r)
ncols = 1
elif tiling == 'horizontal':
ncols = len(sk2r)
nrows = 1
elif tiling == 'symmetric':
import math
N = len(sk2r)
largest_divisor = 1
for i in range(1, int(math.sqrt(N))+1):
if N % i == 0:
largest_divisor = i
ncols = largest_divisor
nrows = N // ncols
figsize = figsize or (6 * ncols, 6 * nrows)
nrows = len(sk2r)
ncols = 1
figsize = figsize or (6, 6 * nrows)
f, axarr = plt.subplots(nrows, ncols, sharex=False, squeeze=False, figsize=figsize)
groups = list(set(group_fn(result) for result in allresults))
@@ -333,9 +316,7 @@ def plot_results(
g2c = defaultdict(int)
sresults = sk2r[sk]
gresults = defaultdict(list)
idx_row = isplit // ncols
idx_col = isplit % ncols
ax = axarr[idx_row][idx_col]
ax = axarr[isplit][0]
for result in sresults:
group = group_fn(result)
g2c[group] += 1
@@ -374,7 +355,7 @@ def plot_results(
ymean = np.mean(ys, axis=0)
ystd = np.std(ys, axis=0)
ystderr = ystd / np.sqrt(len(ys))
l, = axarr[idx_row][idx_col].plot(usex, ymean, color=color)
l, = axarr[isplit][0].plot(usex, ymean, color=color)
g2l[group] = l
if shaded_err:
ax.fill_between(usex, ymean - ystderr, ymean + ystderr, color=color, alpha=.4)
@@ -391,17 +372,6 @@ def plot_results(
loc=2 if legend_outside else None,
bbox_to_anchor=(1,1) if legend_outside else None)
ax.set_title(sk)
# add xlabels, but only to the bottom row
if xlabel is not None:
for ax in axarr[-1]:
plt.sca(ax)
plt.xlabel(xlabel)
# add ylabels, but only to left column
if ylabel is not None:
for ax in axarr[:,0]:
plt.sca(ax)
plt.ylabel(ylabel)
return f, axarr
def regression_analysis(df):

View File

@@ -1,11 +1,25 @@
from collections import deque
import cv2
cv2.ocl.setUseOpenCL(False)
from .atari_wrappers import WarpFrame, ClipRewardEnv, FrameStack, ScaledFloatFrame
from .wrappers import TimeLimit
# flake8: noqa F403, F405
from .atari_wrappers import *
import numpy as np
import gym
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
def step(self, ac):
observation, reward, done, info = self.env.step(ac)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
done = True
info['TimeLimit.truncated'] = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
class StochasticFrameSkip(gym.Wrapper):
def __init__(self, env, n, stickprob):
@@ -85,7 +99,7 @@ class Downsample(gym.ObservationWrapper):
gym.ObservationWrapper.__init__(self, env)
(oldh, oldw, oldc) = env.observation_space.shape
newshape = (oldh//ratio, oldw//ratio, oldc)
self.observation_space = gym.spaces.Box(low=0, high=255,
self.observation_space = spaces.Box(low=0, high=255,
shape=newshape, dtype=np.uint8)
def observation(self, frame):
@@ -102,7 +116,7 @@ class Rgb2gray(gym.ObservationWrapper):
"""
gym.ObservationWrapper.__init__(self, env)
(oldh, oldw, _oldc) = env.observation_space.shape
self.observation_space = gym.spaces.Box(low=0, high=255,
self.observation_space = spaces.Box(low=0, high=255,
shape=(oldh, oldw, 1), dtype=np.uint8)
def observation(self, frame):
@@ -199,10 +213,8 @@ class StartDoingRandomActionsWrapper(gym.Wrapper):
self.some_random_steps()
return self.last_obs, rew, done, info
def make_retro(*, game, state=None, max_episode_steps=4500, **kwargs):
def make_retro(*, game, state, max_episode_steps, **kwargs):
import retro
if state is None:
state = retro.State.DEFAULT
env = retro.make(game, state, **kwargs)
env = StochasticFrameSkip(env, n=4, stickprob=0.25)
if max_episode_steps is not None:
@@ -215,8 +227,7 @@ def wrap_deepmind_retro(env, scale=True, frame_stack=4):
"""
env = WarpFrame(env)
env = ClipRewardEnv(env)
if frame_stack > 1:
env = FrameStack(env, frame_stack)
env = FrameStack(env, frame_stack)
if scale:
env = ScaledFloatFrame(env)
return env

View File

@@ -177,7 +177,7 @@ def profile_tf_runningmeanstd():
outfile = '/tmp/timeline.json'
with open(outfile, 'wt') as f:
f.write(chrome_trace)
print('Successfully saved profile to {}. Exiting.'.format(outfile))
print(f'Successfully saved profile to {outfile}. Exiting.')
exit(0)
'''

View File

@@ -1,29 +0,0 @@
from baselines.common import mpi_util
from baselines import logger
from baselines.common.tests.test_with_mpi import with_mpi
try:
from mpi4py import MPI
except ImportError:
MPI = None
@with_mpi()
def test_mpi_weighted_mean():
comm = MPI.COMM_WORLD
with logger.scoped_configure(comm=comm):
if comm.rank == 0:
name2valcount = {'a' : (10, 2), 'b' : (20,3)}
elif comm.rank == 1:
name2valcount = {'a' : (19, 1), 'c' : (42,3)}
else:
raise NotImplementedError
d = mpi_util.mpi_weighted_mean(comm, name2valcount)
correctval = {'a' : (10 * 2 + 19) / 3.0, 'b' : 20, 'c' : 42}
if comm.rank == 0:
assert d == correctval, '{} != {}'.format(d, correctval)
for name, (val, count) in name2valcount.items():
for _ in range(count):
logger.logkv_mean(name, val)
d2 = logger.dumpkvs()
if comm.rank == 0:
assert d2 == correctval

View File

@@ -1,2 +0,0 @@
import os, pytest
mark_slow = pytest.mark.skipif(not os.getenv('RUNSLOW'), reason='slow')

View File

@@ -7,16 +7,19 @@ class FixedSequenceEnv(Env):
def __init__(
self,
n_actions=10,
seed=0,
episode_len=100
):
self.np_random = np.random.RandomState()
self.np_random.seed(seed)
self.sequence = [self.np_random.randint(0, n_actions-1) for _ in range(episode_len)]
self.action_space = Discrete(n_actions)
self.observation_space = Discrete(1)
self.np_random = np.random.RandomState(0)
self.episode_len = episode_len
self.sequence = [self.np_random.randint(0, self.action_space.n)
for _ in range(self.episode_len)]
self.time = 0
self.episode_len = episode_len
self.time = 0
self.reset()
def reset(self):
self.time = 0
@@ -27,13 +30,11 @@ class FixedSequenceEnv(Env):
self._choose_next_state()
done = False
if self.episode_len and self.time >= self.episode_len:
rew = 0
done = True
return 0, rew, done, {}
def seed(self, seed=None):
self.np_random.seed(seed)
def _choose_next_state(self):
self.time += 1

View File

@@ -2,45 +2,41 @@ import numpy as np
from abc import abstractmethod
from gym import Env
from gym.spaces import MultiDiscrete, Discrete, Box
from collections import deque
class IdentityEnv(Env):
def __init__(
self,
episode_len=None,
delay=0,
zero_first_rewards=True
episode_len=None
):
self.observation_space = self.action_space
self.episode_len = episode_len
self.time = 0
self.delay = delay
self.zero_first_rewards = zero_first_rewards
self.q = deque(maxlen=delay+1)
self.reset()
def reset(self):
self.q.clear()
for _ in range(self.delay + 1):
self.q.append(self.action_space.sample())
self._choose_next_state()
self.time = 0
self.observation_space = self.action_space
return self.q[-1]
return self.state
def step(self, actions):
rew = self._get_reward(self.q.popleft(), actions)
if self.zero_first_rewards and self.time < self.delay:
rew = self._get_reward(actions)
self._choose_next_state()
done = False
if self.episode_len and self.time >= self.episode_len:
rew = 0
self.q.append(self.action_space.sample())
self.time += 1
done = self.episode_len is not None and self.time >= self.episode_len
return self.q[-1], rew, done, {}
done = True
def seed(self, seed=None):
self.action_space.seed(seed)
return self.state, rew, done, {}
def _choose_next_state(self):
self.state = self.action_space.sample()
self.time += 1
@abstractmethod
def _get_reward(self, state, actions):
def _get_reward(self, actions):
raise NotImplementedError
@@ -49,29 +45,26 @@ class DiscreteIdentityEnv(IdentityEnv):
self,
dim,
episode_len=None,
delay=0,
zero_first_rewards=True
):
self.action_space = Discrete(dim)
super().__init__(episode_len=episode_len, delay=delay, zero_first_rewards=zero_first_rewards)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
return 1 if state == actions else 0
def _get_reward(self, actions):
return 1 if self.state == actions else 0
class MultiDiscreteIdentityEnv(IdentityEnv):
def __init__(
self,
dims,
episode_len=None,
delay=0,
):
self.action_space = MultiDiscrete(dims)
super().__init__(episode_len=episode_len, delay=delay)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
return 1 if all(state == actions) else 0
def _get_reward(self, actions):
return 1 if all(self.state == actions) else 0
class BoxIdentityEnv(IdentityEnv):
@@ -81,10 +74,10 @@ class BoxIdentityEnv(IdentityEnv):
episode_len=None,
):
self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32)
self.action_space = Box(low=-1.0, high=1.0, shape=shape)
super().__init__(episode_len=episode_len)
def _get_reward(self, state, actions):
diff = actions - state
def _get_reward(self, actions):
diff = actions - self.state
diff = diff[:]
return -0.5 * np.dot(diff, diff)

View File

@@ -1,36 +0,0 @@
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv
def test_discrete_nodelay():
nsteps = 100
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen)
ob = env.reset()
for t in range(nsteps):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
assert rew == (1 if action == ob else 0)
if (t + 1) % eplen == 0:
assert done
next_ob = env.reset()
else:
assert not done
ob = next_ob
def test_discrete_delay1():
eplen = 50
env = DiscreteIdentityEnv(10, episode_len=eplen, delay=1)
ob = env.reset()
prev_ob = None
for t in range(eplen):
action = env.action_space.sample()
next_ob, rew, done, info = env.step(action)
if t > 0:
assert rew == (1 if action == prev_ob else 0)
else:
assert rew == 0
prev_ob = ob
ob = next_ob
if t < eplen - 1:
assert not done
assert done

View File

@@ -9,6 +9,7 @@ from gym.spaces import Discrete, Box
class MnistEnv(Env):
def __init__(
self,
seed=0,
episode_len=None,
no_images=None
):
@@ -22,6 +23,7 @@ class MnistEnv(Env):
self.mnist = input_data.read_data_sets(mnist_path)
self.np_random = np.random.RandomState()
self.np_random.seed(seed)
self.observation_space = Box(low=0.0, high=1.0, shape=(28,28,1))
self.action_space = Discrete(10)
@@ -48,9 +50,6 @@ class MnistEnv(Env):
return self.state[0], rew, done, {}
def seed(self, seed=None):
self.np_random.seed(seed)
def train_mode(self):
self.dataset = self.mnist.train

View File

@@ -3,7 +3,6 @@ import gym
from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
common_kwargs = dict(
total_timesteps=30000,
@@ -21,7 +20,7 @@ learn_kwargs = {
'trpo_mpi': {}
}
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_cartpole(alg):
'''

View File

@@ -3,7 +3,6 @@ import gym
from baselines.run import get_learn_function
from baselines.common.tests.util import reward_per_episode_test
from baselines.common.tests import mark_slow
pytest.importorskip('mujoco_py')
@@ -16,7 +15,7 @@ learn_kwargs = {
'her': dict(total_timesteps=2000)
}
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_kwargs.keys())
def test_fetchreach(alg):
'''

View File

@@ -3,8 +3,6 @@ from baselines.common.tests.envs.fixed_sequence_env import FixedSequenceEnv
from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
common_kwargs = dict(
seed=0,
@@ -23,7 +21,7 @@ learn_kwargs = {
alg_list = learn_kwargs.keys()
rnn_list = ['lstm']
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", alg_list)
@pytest.mark.parametrize("rnn", rnn_list)
def test_fixed_sequence(alg, rnn):
@@ -35,7 +33,8 @@ def test_fixed_sequence(alg, rnn):
kwargs = learn_kwargs[alg]
kwargs.update(common_kwargs)
env_fn = lambda: FixedSequenceEnv(n_actions=10, episode_len=5)
episode_len = 5
env_fn = lambda: FixedSequenceEnv(10, episode_len=episode_len)
learn = lambda e: get_learn_function(alg)(
env=e,
network=rnn,

View File

@@ -2,7 +2,6 @@ import pytest
from baselines.common.tests.envs.identity_env import DiscreteIdentityEnv, BoxIdentityEnv, MultiDiscreteIdentityEnv
from baselines.run import get_learn_function
from baselines.common.tests.util import simple_test
from baselines.common.tests import mark_slow
common_kwargs = dict(
total_timesteps=30000,
@@ -25,7 +24,7 @@ algos_disc = ['a2c', 'acktr', 'deepq', 'ppo2', 'trpo_mpi']
algos_multidisc = ['a2c', 'acktr', 'ppo2', 'trpo_mpi']
algos_cont = ['a2c', 'acktr', 'ddpg', 'ppo2', 'trpo_mpi']
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_disc)
def test_discrete_identity(alg):
'''
@@ -40,7 +39,7 @@ def test_discrete_identity(alg):
env_fn = lambda: DiscreteIdentityEnv(10, episode_len=100)
simple_test(env_fn, learn_fn, 0.9)
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_multidisc)
def test_multidiscrete_identity(alg):
'''
@@ -55,7 +54,7 @@ def test_multidiscrete_identity(alg):
env_fn = lambda: MultiDiscreteIdentityEnv((3,3), episode_len=100)
simple_test(env_fn, learn_fn, 0.9)
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", algos_cont)
def test_continuous_identity(alg):
'''

View File

@@ -4,7 +4,7 @@ import pytest
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.tests.util import simple_test
from baselines.run import get_learn_function
from baselines.common.tests import mark_slow
# TODO investigate a2c and ppo2 failures - is it due to bad hyperparameters for this problem?
# GitHub issue https://github.com/openai/baselines/issues/189
@@ -28,7 +28,7 @@ learn_args = {
#tests pass, but are too slow on travis. Same algorithms are covered
# by other tests with less compute-hungry nn's and by benchmarks
@pytest.mark.skip
@mark_slow
@pytest.mark.slow
@pytest.mark.parametrize("alg", learn_args.keys())
def test_mnist(alg):
'''
@@ -41,7 +41,7 @@ def test_mnist(alg):
learn = get_learn_function(alg)
learn_fn = lambda e: learn(env=e, **learn_kwargs)
env_fn = lambda: MnistEnv(episode_len=100)
env_fn = lambda: MnistEnv(seed=0, episode_len=100)
simple_test(env_fn, learn_fn, 0.6)

View File

@@ -1,17 +0,0 @@
# smoke tests of plot_util
from baselines.common import plot_util as pu
from baselines.common.tests.util import smoketest
def test_plot_util():
nruns = 4
logdirs = [smoketest('--alg=ppo2 --env=CartPole-v0 --num_timesteps=10000') for _ in range(nruns)]
data = pu.load_results(logdirs)
assert len(data) == 4
_, axes = pu.plot_results(data[:1]); assert len(axes) == 1
_, axes = pu.plot_results(data, tiling='vertical'); assert axes.shape==(4,1)
_, axes = pu.plot_results(data, tiling='horizontal'); assert axes.shape==(1,4)
_, axes = pu.plot_results(data, tiling='symmetric'); assert axes.shape==(2,2)
_, axes = pu.plot_results(data, split_fn=lambda _: ''); assert len(axes) == 1

View File

@@ -44,12 +44,7 @@ def test_serialization(learn_fn, network_fn):
# github issue: https://github.com/openai/baselines/issues/660
return
def make_env():
env = MnistEnv(episode_len=100)
env.seed(10)
return env
env = DummyVecEnv([make_env])
env = DummyVecEnv([lambda: MnistEnv(10, episode_len=100)])
ob = env.reset().copy()
learn = get_learn_function(learn_fn)

View File

@@ -1,38 +0,0 @@
import os
import sys
import subprocess
import cloudpickle
import base64
import pytest
from functools import wraps
try:
from mpi4py import MPI
except ImportError:
MPI = None
def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True):
def outer_thunk(fn):
@wraps(fn)
def thunk(*args, **kwargs):
serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs)))
subprocess.check_call([
'mpiexec','-n', str(nproc),
sys.executable,
'-m', 'baselines.common.tests.test_with_mpi',
serialized_fn
], env=os.environ, timeout=timeout)
if skip_if_no_mpi:
return pytest.mark.skipif(MPI is None, reason="MPI not present")(thunk)
else:
return thunk
return outer_thunk
if __name__ == '__main__':
if len(sys.argv) > 1:
fn = cloudpickle.loads(base64.b64decode(sys.argv[1]))
assert callable(fn)
fn()

View File

@@ -1,50 +1,56 @@
import tensorflow as tf
import numpy as np
from gym.spaces import np_random
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
N_TRIALS = 10000
N_EPISODES = 100
_sess_config = tf.ConfigProto(
allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1
)
def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS):
def seeded_env_fn():
env = env_fn()
env.seed(0)
return env
np.random.seed(0)
env = DummyVecEnv([seeded_env_fn])
with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default():
np_random.seed(0)
env = DummyVecEnv([env_fn])
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
tf.set_random_seed(0)
model = learn_fn(env)
sum_rew = 0
done = True
for i in range(n_trials):
if done:
obs = env.reset()
state = model.initial_state
if state is not None:
a, v, state, _ = model.step(obs, S=state, M=[False])
else:
a, v, _, _ = model.step(obs)
obs, rew, done, _ = env.step(a)
sum_rew += float(rew)
print("Reward in {} trials is {}".format(n_trials, sum_rew))
assert sum_rew > min_reward_fraction * n_trials, \
'sum of rewards {} is less than {} of the total number of trials {}'.format(sum_rew, min_reward_fraction, n_trials)
def reward_per_episode_test(env_fn, learn_fn, min_avg_reward, n_trials=N_EPISODES):
env = DummyVecEnv([env_fn])
with tf.Graph().as_default(), tf.Session(config=_sess_config).as_default():
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
model = learn_fn(env)
N_TRIALS = 100
observations, actions, rewards = rollout(env, model, N_TRIALS)
rewards = [sum(r) for r in rewards]
avg_rew = sum(rewards) / N_TRIALS
print("Average reward in {} episodes is {}".format(n_trials, avg_rew))
assert avg_rew > min_avg_reward, \
@@ -54,12 +60,14 @@ def rollout(env, model, n_trials):
rewards = []
actions = []
observations = []
for i in range(n_trials):
obs = env.reset()
state = model.initial_state if hasattr(model, 'initial_state') else None
episode_rew = []
episode_actions = []
episode_obs = []
while True:
if state is not None:
a, v, state, _ = model.step(obs, S=state, M=[False])
@@ -67,26 +75,17 @@ def rollout(env, model, n_trials):
a,v, _, _ = model.step(obs)
obs, rew, done, _ = env.step(a)
episode_rew.append(rew)
episode_actions.append(a)
episode_obs.append(obs)
if done:
break
rewards.append(episode_rew)
actions.append(episode_actions)
observations.append(episode_obs)
return observations, actions, rewards
def smoketest(argstr, **kwargs):
import tempfile
import subprocess
import os
argstr = 'python -m baselines.run ' + argstr
for key, value in kwargs:
argstr += ' --{}={}'.format(key, value)
tempdir = tempfile.mkdtemp()
env = os.environ.copy()
env['OPENAI_LOGDIR'] = tempdir
subprocess.run(argstr.split(' '), env=env)
return tempdir

View File

@@ -1,3 +1,4 @@
import joblib
import numpy as np
import tensorflow as tf # pylint: ignore-module
import copy
@@ -305,17 +306,12 @@ def display_var_info(vars):
logger.info("Total model parameters: %0.2f million" % (count_params*1e-6))
def get_available_gpus(session_config=None):
# based on recipe from https://stackoverflow.com/a/38580201
# Unless we allocate a session here, subsequent attempts to create one
# will ignore our custom config (in particular, allow_growth=True will have
# no effect).
if session_config is None:
session_config = get_session()._config
def get_available_gpus():
# recipe from here:
# https://stackoverflow.com/questions/38559755/how-to-get-current-available-gpus-in-tensorflow?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa
from tensorflow.python.client import device_lib
local_device_protos = device_lib.list_local_devices(session_config)
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
# ================================================================
@@ -343,7 +339,6 @@ def save_state(fname, sess=None):
# TODO: ensure there is no subtle differences and remove one
def save_variables(save_path, variables=None, sess=None):
import joblib
sess = sess or get_session()
variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
@@ -355,7 +350,6 @@ def save_variables(save_path, variables=None, sess=None):
joblib.dump(save_dict, save_path)
def load_variables(load_path, variables=None, sess=None):
import joblib
sess = sess or get_session()
variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)

View File

@@ -1,10 +1,185 @@
from .vec_env import AlreadySteppingError, NotSteppingError, VecEnv, VecEnvWrapper, VecEnvObservationWrapper, CloudpickleWrapper
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from .vec_frame_stack import VecFrameStack
from .vec_monitor import VecMonitor
from .vec_normalize import VecNormalize
from .vec_remove_dict_obs import VecExtractDictObs
from abc import ABC, abstractmethod
from baselines.common.tile_images import tile_images
__all__ = ['AlreadySteppingError', 'NotSteppingError', 'VecEnv', 'VecEnvWrapper', 'VecEnvObservationWrapper', 'CloudpickleWrapper', 'DummyVecEnv', 'ShmemVecEnv', 'SubprocVecEnv', 'VecFrameStack', 'VecMonitor', 'VecNormalize', 'VecExtractDictObs']
class AlreadySteppingError(Exception):
"""
Raised when an asynchronous step is running while
step_async() is called again.
"""
def __init__(self):
msg = 'already running an async step'
Exception.__init__(self, msg)
class NotSteppingError(Exception):
"""
Raised when an asynchronous step is not running but
step_wait() is called.
"""
def __init__(self):
msg = 'not running an async step'
Exception.__init__(self, msg)
class VecEnv(ABC):
"""
An abstract asynchronous, vectorized environment.
Used to batch data from multiple copies of an environment, so that
each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
"""
closed = False
viewer = None
metadata = {
'render.modes': ['human', 'rgb_array']
}
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space
@abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass
@abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass
@abstractmethod
def step_wait(self):
"""
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
"""
pass
def close_extras(self):
"""
Clean up the extra resources, beyond what's in this base class.
Only runs when not self.closed.
"""
pass
def close(self):
if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras()
self.closed = True
def step(self, actions):
"""
Step the environments synchronously.
This is available for backwards compatibility.
"""
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
def get_images(self):
"""
Return RGB images from each environment
"""
raise NotImplementedError
@property
def unwrapped(self):
if isinstance(self, VecEnvWrapper):
return self.venv.unwrapped
else:
return self
def get_viewer(self):
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
class VecEnvWrapper(VecEnv):
"""
An environment wrapper that applies to an entire batch
of environments at once.
"""
def __init__(self, venv, observation_space=None, action_space=None):
self.venv = venv
VecEnv.__init__(self,
num_envs=venv.num_envs,
observation_space=observation_space or venv.observation_space,
action_space=action_space or venv.action_space)
def step_async(self, actions):
self.venv.step_async(actions)
@abstractmethod
def reset(self):
pass
@abstractmethod
def step_wait(self):
pass
def close(self):
return self.venv.close()
def render(self, mode='human'):
return self.venv.render(mode=mode)
def get_images(self):
return self.venv.get_images()
class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
"""
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)

View File

@@ -1,5 +1,6 @@
import numpy as np
from .vec_env import VecEnv
from gym import spaces
from . import VecEnv
from .util import copy_obs_dict, dict_to_obs, obs_space_info
class DummyVecEnv(VecEnv):
@@ -26,7 +27,7 @@ class DummyVecEnv(VecEnv):
self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32)
self.buf_infos = [{} for _ in range(self.num_envs)]
self.actions = None
self.spec = self.envs[0].spec
self.specs = [e.spec for e in self.envs]
def step_async(self, actions):
listify = True
@@ -45,8 +46,8 @@ class DummyVecEnv(VecEnv):
def step_wait(self):
for e in range(self.num_envs):
action = self.actions[e]
# if isinstance(self.envs[e].action_space, spaces.Discrete):
# action = int(action)
if isinstance(self.envs[e].action_space, spaces.Discrete):
action = int(action)
obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(action)
if self.buf_dones[e]:

View File

@@ -2,9 +2,9 @@
An interface for asynchronous vectorized environments.
"""
import multiprocessing as mp
from multiprocessing import Pipe, Array, Process
import numpy as np
from .vec_env import VecEnv, CloudpickleWrapper, clear_mpi_env_vars
from . import VecEnv, CloudpickleWrapper
import ctypes
from baselines import logger
@@ -22,12 +22,11 @@ class ShmemVecEnv(VecEnv):
Optimized version of SubprocVecEnv that uses shared variables to communicate observations.
"""
def __init__(self, env_fns, spaces=None, context='spawn'):
def __init__(self, env_fns, spaces=None):
"""
If you don't specify observation_space, we'll have to create a dummy
environment to get it.
"""
ctx = mp.get_context(context)
if spaces:
observation_space, action_space = spaces
else:
@@ -40,22 +39,22 @@ class ShmemVecEnv(VecEnv):
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
self.obs_keys, self.obs_shapes, self.obs_dtypes = obs_space_info(observation_space)
self.obs_bufs = [
{k: ctx.Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys}
{k: Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys}
for _ in env_fns]
self.parent_pipes = []
self.procs = []
with clear_mpi_env_vars():
for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
wrapped_fn = CloudpickleWrapper(env_fn)
parent_pipe, child_pipe = ctx.Pipe()
proc = ctx.Process(target=_subproc_worker,
args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
proc.daemon = True
self.procs.append(proc)
self.parent_pipes.append(parent_pipe)
proc.start()
child_pipe.close()
for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
wrapped_fn = CloudpickleWrapper(env_fn)
parent_pipe, child_pipe = Pipe()
proc = Process(target=_subproc_worker,
args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
proc.daemon = True
self.procs.append(proc)
self.parent_pipes.append(parent_pipe)
proc.start()
child_pipe.close()
self.waiting_step = False
self.specs = [f().spec for f in env_fns]
self.viewer = None
def reset(self):

View File

@@ -1,8 +1,6 @@
import multiprocessing as mp
import numpy as np
from .vec_env import VecEnv, CloudpickleWrapper, clear_mpi_env_vars
from multiprocessing import Process, Pipe
from . import VecEnv, CloudpickleWrapper
def worker(remote, parent_remote, env_fn_wrapper):
parent_remote.close()
@@ -23,8 +21,8 @@ def worker(remote, parent_remote, env_fn_wrapper):
elif cmd == 'close':
remote.close()
break
elif cmd == 'get_spaces_spec':
remote.send((env.observation_space, env.action_space, env.spec))
elif cmd == 'get_spaces':
remote.send((env.observation_space, env.action_space))
else:
raise NotImplementedError
except KeyboardInterrupt:
@@ -38,7 +36,7 @@ class SubprocVecEnv(VecEnv):
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
Recommended to use when num_envs > 1 and step() can be a bottleneck.
"""
def __init__(self, env_fns, spaces=None, context='spawn'):
def __init__(self, env_fns, spaces=None):
"""
Arguments:
@@ -47,20 +45,19 @@ class SubprocVecEnv(VecEnv):
self.waiting = False
self.closed = False
nenvs = len(env_fns)
ctx = mp.get_context(context)
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(nenvs)])
self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(nenvs)])
self.ps = [Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
with clear_mpi_env_vars():
p.start()
p.start()
for remote in self.work_remotes:
remote.close()
self.remotes[0].send(('get_spaces_spec', None))
observation_space, action_space, self.spec = self.remotes[0].recv()
self.remotes[0].send(('get_spaces', None))
observation_space, action_space = self.remotes[0].recv()
self.viewer = None
self.specs = [f().spec for f in env_fns]
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
def step_async(self, actions):
@@ -102,16 +99,16 @@ class SubprocVecEnv(VecEnv):
def _assert_not_closed(self):
assert not self.closed, "Trying to operate on a SubprocVecEnv after calling close()"
def __del__(self):
if not self.closed:
self.close()
def _flatten_obs(obs):
assert isinstance(obs, (list, tuple))
assert isinstance(obs, list) or isinstance(obs, tuple)
assert len(obs) > 0
if isinstance(obs[0], dict):
import collections
assert isinstance(obs, collections.OrderedDict)
keys = obs[0].keys()
return {k: np.stack([o[k] for o in obs]) for k in keys}
else:
return np.stack(obs)

View File

@@ -8,40 +8,39 @@ import pytest
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from baselines.common.tests.test_with_mpi import with_mpi
def assert_venvs_equal(venv1, venv2, num_steps):
def assert_envs_equal(env1, env2, num_steps):
"""
Compare two environments over num_steps steps and make sure
that the observations produced by each are the same when given
the same actions.
"""
assert venv1.num_envs == venv2.num_envs
assert venv1.observation_space.shape == venv2.observation_space.shape
assert venv1.observation_space.dtype == venv2.observation_space.dtype
assert venv1.action_space.shape == venv2.action_space.shape
assert venv1.action_space.dtype == venv2.action_space.dtype
assert env1.num_envs == env2.num_envs
assert env1.action_space.shape == env2.action_space.shape
assert env1.action_space.dtype == env2.action_space.dtype
joint_shape = (env1.num_envs,) + env1.action_space.shape
try:
obs1, obs2 = venv1.reset(), venv2.reset()
obs1, obs2 = env1.reset(), env2.reset()
assert np.array(obs1).shape == np.array(obs2).shape
assert np.array(obs1).shape == (venv1.num_envs,) + venv1.observation_space.shape
assert np.array(obs1).shape == joint_shape
assert np.allclose(obs1, obs2)
venv1.action_space.seed(1337)
np.random.seed(1337)
for _ in range(num_steps):
actions = np.array([venv1.action_space.sample() for _ in range(venv1.num_envs)])
for venv in [venv1, venv2]:
venv.step_async(actions)
outs1 = venv1.step_wait()
outs2 = venv2.step_wait()
actions = np.array(np.random.randint(0, 0x100, size=joint_shape),
dtype=env1.action_space.dtype)
for env in [env1, env2]:
env.step_async(actions)
outs1 = env1.step_wait()
outs2 = env2.step_wait()
for out1, out2 in zip(outs1[:3], outs2[:3]):
assert np.array(out1).shape == np.array(out2).shape
assert np.allclose(out1, out2)
assert list(outs1[3]) == list(outs2[3])
finally:
venv1.close()
venv2.close()
env1.close()
env2.close()
@pytest.mark.parametrize('klass', (ShmemVecEnv, SubprocVecEnv))
@@ -64,7 +63,7 @@ def test_vec_env(klass, dtype): # pylint: disable=R0914
fns = [make_fn(i) for i in range(num_envs)]
env1 = DummyVecEnv(fns)
env2 = klass(fns)
assert_venvs_equal(env1, env2, num_steps=num_steps)
assert_envs_equal(env1, env2, num_steps=num_steps)
class SimpleEnv(gym.Env):
@@ -100,15 +99,3 @@ class SimpleEnv(gym.Env):
def render(self, mode=None):
raise NotImplementedError
@with_mpi()
def test_mpi_with_subprocvecenv():
shape = (2,3,4)
nenv = 1
venv = SubprocVecEnv([lambda: SimpleEnv(0, shape, 'float32')] * nenv)
ob = venv.reset()
venv.close()
assert ob.shape == (nenv,) + shape

View File

@@ -1,223 +0,0 @@
import contextlib
import os
from abc import ABC, abstractmethod
from baselines.common.tile_images import tile_images
class AlreadySteppingError(Exception):
"""
Raised when an asynchronous step is running while
step_async() is called again.
"""
def __init__(self):
msg = 'already running an async step'
Exception.__init__(self, msg)
class NotSteppingError(Exception):
"""
Raised when an asynchronous step is not running but
step_wait() is called.
"""
def __init__(self):
msg = 'not running an async step'
Exception.__init__(self, msg)
class VecEnv(ABC):
"""
An abstract asynchronous, vectorized environment.
Used to batch data from multiple copies of an environment, so that
each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
"""
closed = False
viewer = None
metadata = {
'render.modes': ['human', 'rgb_array']
}
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space
@abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass
@abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass
@abstractmethod
def step_wait(self):
"""
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
"""
pass
def close_extras(self):
"""
Clean up the extra resources, beyond what's in this base class.
Only runs when not self.closed.
"""
pass
def close(self):
if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras()
self.closed = True
def step(self, actions):
"""
Step the environments synchronously.
This is available for backwards compatibility.
"""
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
def get_images(self):
"""
Return RGB images from each environment
"""
raise NotImplementedError
@property
def unwrapped(self):
if isinstance(self, VecEnvWrapper):
return self.venv.unwrapped
else:
return self
def get_viewer(self):
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
class VecEnvWrapper(VecEnv):
"""
An environment wrapper that applies to an entire batch
of environments at once.
"""
def __init__(self, venv, observation_space=None, action_space=None):
self.venv = venv
super().__init__(num_envs=venv.num_envs,
observation_space=observation_space or venv.observation_space,
action_space=action_space or venv.action_space)
def step_async(self, actions):
self.venv.step_async(actions)
@abstractmethod
def reset(self):
pass
@abstractmethod
def step_wait(self):
pass
def close(self):
return self.venv.close()
def render(self, mode='human'):
return self.venv.render(mode=mode)
def get_images(self):
return self.venv.get_images()
def __getattr__(self, name):
if name.startswith('_'):
raise AttributeError("attempted to get missing private attribute '{}'".format(name))
return getattr(self.venv, name)
class VecEnvObservationWrapper(VecEnvWrapper):
@abstractmethod
def process(self, obs):
pass
def reset(self):
obs = self.venv.reset()
return self.process(obs)
def step_wait(self):
obs, rews, dones, infos = self.venv.step_wait()
return self.process(obs), rews, dones, infos
class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
"""
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)
@contextlib.contextmanager
def clear_mpi_env_vars():
"""
from mpi4py import MPI will call MPI_Init by default. If the child process has MPI environment variables, MPI will think that the child process is an MPI process just like the parent and do bad things such as hang.
This context manager is a hacky way to clear those environment variables temporarily such as when we are starting multiprocessing
Processes.
"""
removed_environment = {}
for k, v in list(os.environ.items()):
for prefix in ['OMPI_', 'PMI_']:
if k.startswith(prefix):
removed_environment[k] = v
del os.environ[k]
try:
yield
finally:
os.environ.update(removed_environment)

View File

@@ -1,4 +1,4 @@
from .vec_env import VecEnvWrapper
from . import VecEnvWrapper
import numpy as np
from gym import spaces

View File

@@ -2,25 +2,15 @@ from . import VecEnvWrapper
from baselines.bench.monitor import ResultsWriter
import numpy as np
import time
from collections import deque
class VecMonitor(VecEnvWrapper):
def __init__(self, venv, filename=None, keep_buf=0, info_keywords=()):
def __init__(self, venv, filename=None):
VecEnvWrapper.__init__(self, venv)
self.eprets = None
self.eplens = None
self.epcount = 0
self.tstart = time.time()
if filename:
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart},
extra_keys=info_keywords)
else:
self.results_writer = None
self.info_keywords = info_keywords
self.keep_buf = keep_buf
if self.keep_buf:
self.epret_buf = deque([], maxlen=keep_buf)
self.eplen_buf = deque([], maxlen=keep_buf)
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart})
def reset(self):
obs = self.venv.reset()
@@ -32,24 +22,16 @@ class VecMonitor(VecEnvWrapper):
obs, rews, dones, infos = self.venv.step_wait()
self.eprets += rews
self.eplens += 1
newinfos = list(infos[:])
for i in range(len(dones)):
if dones[i]:
info = infos[i].copy()
ret = self.eprets[i]
eplen = self.eplens[i]
newinfos = []
for (i, (done, ret, eplen, info)) in enumerate(zip(dones, self.eprets, self.eplens, infos)):
info = info.copy()
if done:
epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)}
for k in self.info_keywords:
epinfo[k] = info[k]
info['episode'] = epinfo
if self.keep_buf:
self.epret_buf.append(ret)
self.eplen_buf.append(eplen)
self.epcount += 1
self.eprets[i] = 0
self.eplens[i] = 0
if self.results_writer:
self.results_writer.write_row(epinfo)
newinfos[i] = info
self.results_writer.write_row(epinfo)
newinfos.append(info)
return obs, rews, dones, newinfos

View File

@@ -1,22 +1,18 @@
from . import VecEnvWrapper
from baselines.common.running_mean_std import RunningMeanStd
import numpy as np
class VecNormalize(VecEnvWrapper):
"""
A vectorized wrapper that normalizes the observations
and returns from an environment.
"""
def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8, use_tf=False):
def __init__(self, venv, ob=True, ret=True, clipob=10., cliprew=10., gamma=0.99, epsilon=1e-8):
VecEnvWrapper.__init__(self, venv)
if use_tf:
from baselines.common.running_mean_std import TfRunningMeanStd
self.ob_rms = TfRunningMeanStd(shape=self.observation_space.shape, scope='ob_rms') if ob else None
self.ret_rms = TfRunningMeanStd(shape=(), scope='ret_rms') if ret else None
else:
from baselines.common.running_mean_std import RunningMeanStd
self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None
self.ret_rms = RunningMeanStd(shape=()) if ret else None
self.ob_rms = RunningMeanStd(shape=self.observation_space.shape) if ob else None
self.ret_rms = RunningMeanStd(shape=()) if ret else None
self.clipob = clipob
self.cliprew = cliprew
self.ret = np.zeros(self.num_envs)

View File

@@ -1,10 +0,0 @@
from .vec_env import VecEnvObservationWrapper
class VecExtractDictObs(VecEnvObservationWrapper):
def __init__(self, venv, key):
self.key = key
super().__init__(venv=venv,
observation_space=venv.observation_space.spaces[self.key])
def process(self, obs):
return obs[self.key]

View File

@@ -1,29 +0,0 @@
import gym
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
def step(self, ac):
observation, reward, done, info = self.env.step(ac)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
done = True
info['TimeLimit.truncated'] = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
class ClipActionsWrapper(gym.Wrapper):
def step(self, action):
import numpy as np
action = np.nan_to_num(action)
action = np.clip(action, self.action_space.low, self.action_space.high)
return self.env.step(action)
def reset(self, **kwargs):
return self.env.reset(**kwargs)

View File

@@ -217,9 +217,7 @@ def learn(network, env,
stats = agent.get_stats()
combined_stats = stats.copy()
combined_stats['rollout/return'] = np.mean(epoch_episode_rewards)
combined_stats['rollout/return_std'] = np.std(epoch_episode_rewards)
combined_stats['rollout/return_history'] = np.mean(episode_rewards_history)
combined_stats['rollout/return_history_std'] = np.std(episode_rewards_history)
combined_stats['rollout/episode_steps'] = np.mean(epoch_episode_steps)
combined_stats['rollout/actions_mean'] = np.mean(epoch_actions)
combined_stats['rollout/Q_mean'] = np.mean(epoch_qs)

View File

@@ -17,7 +17,7 @@ except ImportError:
def normalize(x, stats):
if stats is None:
return x
return (x - stats.mean) / (stats.std + 1e-8)
return (x - stats.mean) / stats.std
def denormalize(x, stats):

View File

@@ -1,6 +1,7 @@
from baselines.common.tests.util import smoketest
from baselines.run import main as M
def _run(argstr):
smoketest('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr)
M(('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' '))
def test_popart():
_run('--normalize_returns=True --popart=True')

View File

@@ -23,7 +23,7 @@ def model(inpt, num_actions, scope, reuse=False):
if __name__ == '__main__':
with U.make_session(num_cpu=8):
with U.make_session(8):
# Create the environment
env = gym.make("CartPole-v0")
# Create all the functions necessary to train the model

View File

@@ -20,7 +20,7 @@ class TfInput(object):
"""
raise NotImplementedError
def make_feed_dict(self, data):
def make_feed_dict(data):
"""Given data input it to the placeholder(s)."""
raise NotImplementedError

View File

@@ -12,13 +12,13 @@ Download the expert data into `./data`, [download link](https://drive.google.com
### Step 2: Run GAIL
Run with single rank:
Run with single thread:
```bash
python -m baselines.gail.run_mujoco
```
Run with multiple ranks:
Run with multiple threads:
```bash
mpirun -np 16 python -m baselines.gail.run_mujoco

View File

@@ -66,7 +66,7 @@ class TransitionClassifier(object):
with tf.variable_scope("obfilter"):
self.obs_rms = RunningMeanStd(shape=self.observation_shape)
obs = (obs_ph - self.obs_rms.mean) / self.obs_rms.std
obs = (obs_ph - self.obs_rms.mean / self.obs_rms.std)
_input = tf.concat([obs, acs_ph], axis=1) # concatenate the two input -> form a transition
p_h1 = tf.contrib.layers.fully_connected(_input, self.hidden_size, activation_fn=tf.nn.tanh)
p_h2 = tf.contrib.layers.fully_connected(p_h1, self.hidden_size, activation_fn=tf.nn.tanh)

View File

@@ -50,12 +50,8 @@ class Mujoco_Dset(object):
# obs, acs: shape (N, L, ) + S where N = # episodes, L = episode length
# and S is the environment observation/action space.
# Flatten to (N * L, prod(S))
if len(obs.shape) > 2:
self.obs = np.reshape(obs, [-1, np.prod(obs.shape[2:])])
self.acs = np.reshape(acs, [-1, np.prod(acs.shape[2:])])
else:
self.obs = np.vstack(obs)
self.acs = np.vstack(acs)
self.obs = np.reshape(obs, [-1, np.prod(obs.shape[2:])])
self.acs = np.reshape(acs, [-1, np.prod(acs.shape[2:])])
self.rets = traj_data['ep_rets'][:traj_limitation]
self.avg_ret = sum(self.rets)/len(self.rets)

View File

@@ -108,7 +108,7 @@ def learn(*, network, env, total_timesteps,
# Prepare params.
params = config.DEFAULT_PARAMS
env_name = env.spec.id
env_name = env.specs[0].id
params['env_name'] = env_name
params['replay_strategy'] = replay_strategy
if env_name in config.DEFAULT_ENV_PARAMS:

View File

@@ -7,7 +7,6 @@ import time
import datetime
import tempfile
from collections import defaultdict
from contextlib import contextmanager
DEBUG = 10
INFO = 20
@@ -38,8 +37,8 @@ class HumanOutputFormat(KVWriter, SeqWriter):
# Create strings for printing
key2str = {}
for (key, val) in sorted(kvs.items()):
if hasattr(val, '__float__'):
valstr = '%-8.3g' % val
if isinstance(val, float):
valstr = '%-8.3g' % (val,)
else:
valstr = str(val)
key2str[self._truncate(key)] = self._truncate(valstr)
@@ -69,8 +68,7 @@ class HumanOutputFormat(KVWriter, SeqWriter):
self.file.flush()
def _truncate(self, s):
maxlen = 30
return s[:maxlen-3] + '...' if len(s) > maxlen else s
return s[:20] + '...' if len(s) > 23 else s
def writeseq(self, seq):
seq = list(seq)
@@ -92,6 +90,7 @@ class JSONOutputFormat(KVWriter):
def writekvs(self, kvs):
for k, v in sorted(kvs.items()):
if hasattr(v, 'dtype'):
v = v.tolist()
kvs[k] = float(v)
self.file.write(json.dumps(kvs) + '\n')
self.file.flush()
@@ -196,13 +195,13 @@ def logkv(key, val):
Call this once for each diagnostic quantity, each iteration
If called many times, last value will be used.
"""
get_current().logkv(key, val)
Logger.CURRENT.logkv(key, val)
def logkv_mean(key, val):
"""
The same as logkv(), but if called many times, values averaged.
"""
get_current().logkv_mean(key, val)
Logger.CURRENT.logkv_mean(key, val)
def logkvs(d):
"""
@@ -214,18 +213,21 @@ def logkvs(d):
def dumpkvs():
"""
Write all of the diagnostics from the current iteration
level: int. (see logger.py docs) If the global logger level is higher than
the level argument here, don't print to stdout.
"""
return get_current().dumpkvs()
Logger.CURRENT.dumpkvs()
def getkvs():
return get_current().name2val
return Logger.CURRENT.name2val
def log(*args, level=INFO):
"""
Write the sequence of args, with no separators, to the console and output files (if you've configured an output file).
"""
get_current().log(*args, level=level)
Logger.CURRENT.log(*args, level=level)
def debug(*args):
log(*args, level=DEBUG)
@@ -244,29 +246,30 @@ def set_level(level):
"""
Set logging threshold on current logger.
"""
get_current().set_level(level)
def set_comm(comm):
get_current().set_comm(comm)
Logger.CURRENT.set_level(level)
def get_dir():
"""
Get directory that log files are being written to.
will be None if there is no output directory (i.e., if you didn't call start)
"""
return get_current().get_dir()
return Logger.CURRENT.get_dir()
record_tabular = logkv
dump_tabular = dumpkvs
@contextmanager
def profile_kv(scopename):
logkey = 'wait_' + scopename
tstart = time.time()
try:
yield
finally:
get_current().name2val[logkey] += time.time() - tstart
class ProfileKV:
"""
Usage:
with logger.ProfileKV("interesting_scope"):
code
"""
def __init__(self, n):
self.n = "wait_" + n
def __enter__(self):
self.t1 = time.time()
def __exit__(self ,type, value, traceback):
Logger.CURRENT.name2val[self.n] += time.time() - self.t1
def profile(n):
"""
@@ -276,7 +279,7 @@ def profile(n):
"""
def decorator_with_name(func):
def func_wrapper(*args, **kwargs):
with profile_kv(n):
with ProfileKV(n):
return func(*args, **kwargs)
return func_wrapper
return decorator_with_name
@@ -286,25 +289,17 @@ def profile(n):
# Backend
# ================================================================
def get_current():
if Logger.CURRENT is None:
_configure_default_logger()
return Logger.CURRENT
class Logger(object):
DEFAULT = None # A logger with no output files. (See right below class definition)
# So that you can still log to the terminal without setting up any output files
CURRENT = None # Current logger being used by the free functions above
def __init__(self, dir, output_formats, comm=None):
def __init__(self, dir, output_formats):
self.name2val = defaultdict(float) # values this iteration
self.name2cnt = defaultdict(int)
self.level = INFO
self.dir = dir
self.output_formats = output_formats
self.comm = comm
# Logging API, forwarded
# ----------------------------------------
@@ -312,27 +307,20 @@ class Logger(object):
self.name2val[key] = val
def logkv_mean(self, key, val):
if val is None:
self.name2val[key] = None
return
oldval, cnt = self.name2val[key], self.name2cnt[key]
self.name2val[key] = oldval*cnt/(cnt+1) + val/(cnt+1)
self.name2cnt[key] = cnt + 1
def dumpkvs(self):
if self.comm is None:
d = self.name2val
else:
from baselines.common import mpi_util
d = mpi_util.mpi_weighted_mean(self.comm,
{name : (val, self.name2cnt.get(name, 1))
for (name, val) in self.name2val.items()})
if self.comm.rank != 0:
d['dummy'] = 1 # so we don't get a warning about empty dict
out = d.copy() # Return the dict for unit testing purposes
if self.level == DISABLED: return
for fmt in self.output_formats:
if isinstance(fmt, KVWriter):
fmt.writekvs(d)
fmt.writekvs(self.name2val)
self.name2val.clear()
self.name2cnt.clear()
return out
def log(self, *args, level=INFO):
if self.level <= level:
@@ -343,9 +331,6 @@ class Logger(object):
def set_level(self, level):
self.level = level
def set_comm(self, comm):
self.comm = comm
def get_dir(self):
return self.dir
@@ -360,19 +345,7 @@ class Logger(object):
if isinstance(fmt, SeqWriter):
fmt.writeseq(map(str, args))
def get_rank_without_mpi_import():
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
return int(os.environ[varname])
return 0
def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
"""
If comm is provided, average all numerical stats across that comm
"""
def configure(dir=None, format_strs=None):
if dir is None:
dir = os.getenv('OPENAI_LOGDIR')
if dir is None:
@@ -381,9 +354,15 @@ def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
assert isinstance(dir, str)
os.makedirs(dir, exist_ok=True)
rank = get_rank_without_mpi_import()
log_suffix = ''
rank = 0
# check environment variables here instead of importing mpi4py
# to avoid calling MPI_Init() when this module is imported
for varname in ['PMI_RANK', 'OMPI_COMM_WORLD_RANK']:
if varname in os.environ:
rank = int(os.environ[varname])
if rank > 0:
log_suffix = log_suffix + "-rank%03i" % rank
log_suffix = "-rank%03i" % rank
if format_strs is None:
if rank == 0:
@@ -393,11 +372,15 @@ def configure(dir=None, format_strs=None, comm=None, log_suffix=''):
format_strs = filter(None, format_strs)
output_formats = [make_output_format(f, dir, log_suffix) for f in format_strs]
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats, comm=comm)
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats)
log('Logging to %s'%dir)
def _configure_default_logger():
configure()
format_strs = None
# keep the old default of only writing to stdout
if 'OPENAI_LOG_FORMAT' not in os.environ:
format_strs = ['stdout']
configure(format_strs=format_strs)
Logger.DEFAULT = Logger.CURRENT
def reset():
@@ -406,15 +389,17 @@ def reset():
Logger.CURRENT = Logger.DEFAULT
log('Reset logger')
@contextmanager
def scoped_configure(dir=None, format_strs=None, comm=None):
prevlogger = Logger.CURRENT
configure(dir=dir, format_strs=format_strs, comm=comm)
try:
yield
finally:
class scoped_configure(object):
def __init__(self, dir=None, format_strs=None):
self.dir = dir
self.format_strs = format_strs
self.prevlogger = None
def __enter__(self):
self.prevlogger = Logger.CURRENT
configure(dir=self.dir, format_strs=self.format_strs)
def __exit__(self, *args):
Logger.CURRENT.close()
Logger.CURRENT = prevlogger
Logger.CURRENT = self.prevlogger
# ================================================================
@@ -438,7 +423,7 @@ def _demo():
logkv_mean("b", -44.4)
logkv("a", 5.5)
dumpkvs()
info("^^^ should see b = -33.3")
info("^^^ should see b = 33.3")
logkv("b", -2.5)
dumpkvs()
@@ -471,6 +456,7 @@ def read_tb(path):
import pandas
import numpy as np
from glob import glob
from collections import defaultdict
import tensorflow as tf
if osp.isdir(path):
fnames = glob(osp.join(path, "events.*"))
@@ -496,5 +482,8 @@ def read_tb(path):
data[step-1, colidx] = value
return pandas.DataFrame(data, columns=tags)
# configure the default logger on import
_configure_default_logger()
if __name__ == "__main__":
_demo()

View File

@@ -97,6 +97,7 @@ def learn(env, policy_fn, *,
ret = tf.placeholder(dtype=tf.float32, shape=[None]) # Empirical return
lrmult = tf.placeholder(name='lrmult', dtype=tf.float32, shape=[]) # learning rate multiplier, updated with schedule
clip_param = clip_param * lrmult # Annealed clipping parameter epsilon
ob = U.get_placeholder_cached(name="ob")
ac = pi.pdtype.sample_placeholder([None])
@@ -167,7 +168,7 @@ def learn(env, policy_fn, *,
ob, ac, atarg, tdlamret = seg["ob"], seg["ac"], seg["adv"], seg["tdlamret"]
vpredbefore = seg["vpred"] # predicted value function before udpate
atarg = (atarg - atarg.mean()) / atarg.std() # standardized advantage function estimate
d = Dataset(dict(ob=ob, ac=ac, atarg=atarg, vtarg=tdlamret), deterministic=pi.recurrent)
d = Dataset(dict(ob=ob, ac=ac, atarg=atarg, vtarg=tdlamret), shuffle=not pi.recurrent)
optim_batchsize = optim_batchsize or ob.shape[0]
if hasattr(pi, "ob_rms"): pi.ob_rms.update(ob) # update running mean/std for policy

View File

@@ -19,17 +19,16 @@ def train(num_timesteps, seed, model_path=None):
# these are good enough to make humanoid walk, but whether those are
# an absolute best or not is not certain
env = RewScale(env, 0.1)
logger.log("NOTE: reward will be scaled by a factor of 10 in logged stats. Check the monitor for unscaled reward.")
pi = pposgd_simple.learn(env, policy_fn,
max_timesteps=num_timesteps,
timesteps_per_actorbatch=2048,
clip_param=0.1, entcoeff=0.0,
clip_param=0.2, entcoeff=0.0,
optim_epochs=10,
optim_stepsize=1e-4,
optim_stepsize=3e-4,
optim_batchsize=64,
gamma=0.99,
lam=0.95,
schedule='constant',
schedule='linear',
)
env.close()
if model_path:
@@ -48,7 +47,7 @@ def main():
logger.configure()
parser = mujoco_arg_parser()
parser.add_argument('--model-path', default=os.path.join(logger.get_dir(), 'humanoid_policy'))
parser.set_defaults(num_timesteps=int(5e7))
parser.set_defaults(num_timesteps=int(2e7))
args = parser.parse_args()
@@ -69,5 +68,8 @@ def main():
if done:
ob = env.reset()
if __name__ == '__main__':
main()

View File

@@ -18,7 +18,7 @@ def atari():
lam=0.95, gamma=0.99, noptepochs=4, log_interval=1,
ent_coef=.01,
lr=lambda f : f * 2.5e-4,
cliprange=0.1,
cliprange=lambda f : f * 0.1,
)
def retro():

View File

@@ -8,7 +8,7 @@ class MicrobatchedModel(Model):
on the entire minibatch causes some overflow
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight, comm, microbatch_size):
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size):
self.nmicrobatches = nbatch_train // microbatch_size
self.microbatch_size = microbatch_size
@@ -23,9 +23,7 @@ class MicrobatchedModel(Model):
nsteps=nsteps,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm,
mpi_rank_weight=mpi_rank_weight,
comm=comm)
max_grad_norm=max_grad_norm)
self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads]
grads_ph_and_vars = list(zip(self.grads_ph, self.var))

View File

@@ -25,12 +25,9 @@ class Model(object):
- Save load the model
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, mpi_rank_weight=1, comm=None, microbatch_size=None):
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None):
self.sess = sess = get_session()
if MPI is not None and comm is None:
comm = MPI.COMM_WORLD
with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
@@ -94,8 +91,8 @@ class Model(object):
# 1. Get the model parameters
params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer
if comm is not None and comm.Get_size() > 1:
self.trainer = MpiAdamOptimizer(comm, learning_rate=LR, mpi_rank_weight=mpi_rank_weight, epsilon=1e-5)
if MPI is not None:
self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
@@ -128,7 +125,7 @@ class Model(object):
initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
if MPI is not None:
sync_from_root(sess, global_variables, comm=comm) #pylint: disable=E1101
sync_from_root(sess, global_variables) #pylint: disable=E1101
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
# Here we calculate advantage A(s,a) = R + yV(s') - V(s)

View File

@@ -21,7 +21,7 @@ def constfn(val):
def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=0, load_path=None, model_fn=None, update_fn=None, init_fn=None, mpi_rank_weight=1, comm=None, **network_kwargs):
save_interval=0, load_path=None, model_fn=None, **network_kwargs):
'''
Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347)
@@ -97,7 +97,6 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
# Calculate the batch_size
nbatch = nenvs * nsteps
nbatch_train = nbatch // nminibatches
is_mpi_root = (MPI is None or MPI.COMM_WORLD.Get_rank() == 0)
# Instantiate the model object (that creates act_model and train_model)
if model_fn is None:
@@ -106,7 +105,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train,
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef,
max_grad_norm=max_grad_norm, comm=comm, mpi_rank_weight=mpi_rank_weight)
max_grad_norm=max_grad_norm)
if load_path is not None:
model.load(load_path)
@@ -119,32 +118,24 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
if eval_env is not None:
eval_epinfobuf = deque(maxlen=100)
if init_fn is not None:
init_fn()
# Start total timer
tfirststart = time.perf_counter()
tfirststart = time.time()
nupdates = total_timesteps//nbatch
for update in range(1, nupdates+1):
assert nbatch % nminibatches == 0
# Start timer
tstart = time.perf_counter()
tstart = time.time()
frac = 1.0 - (update - 1.0) / nupdates
# Calculate the learning rate
lrnow = lr(frac)
# Calculate the cliprange
cliprangenow = cliprange(frac)
if update % log_interval == 0 and is_mpi_root: logger.info('Stepping environment...')
# Get minibatch
obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632
if eval_env is not None:
eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632
if update % log_interval == 0 and is_mpi_root: logger.info('Done.')
epinfobuf.extend(epinfos)
if eval_env is not None:
eval_epinfobuf.extend(eval_epinfos)
@@ -169,6 +160,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
envsperbatch = nenvs // nminibatches
envinds = np.arange(nenvs)
flatinds = np.arange(nenvs * nsteps).reshape(nenvs, nsteps)
envsperbatch = nbatch_train // nsteps
for _ in range(noptepochs):
np.random.shuffle(envinds)
for start in range(0, nenvs, envsperbatch):
@@ -182,39 +174,34 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
# Feedforward --> get losses --> update
lossvals = np.mean(mblossvals, axis=0)
# End timer
tnow = time.perf_counter()
tnow = time.time()
# Calculate the fps (frame per second)
fps = int(nbatch / (tnow - tstart))
if update_fn is not None:
update_fn(update)
if update % log_interval == 0 or update == 1:
# Calculates if value function is a good predicator of the returns (ev > 1)
# or if it's just worse than predicting nothing (ev =< 0)
ev = explained_variance(values, returns)
logger.logkv("misc/serial_timesteps", update*nsteps)
logger.logkv("misc/nupdates", update)
logger.logkv("misc/total_timesteps", update*nbatch)
logger.logkv("serial_timesteps", update*nsteps)
logger.logkv("nupdates", update)
logger.logkv("total_timesteps", update*nbatch)
logger.logkv("fps", fps)
logger.logkv("misc/explained_variance", float(ev))
logger.logkv("explained_variance", float(ev))
logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf]))
if eval_env is not None:
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) )
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) )
logger.logkv('misc/time_elapsed', tnow - tfirststart)
logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv('loss/' + lossname, lossval)
logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and is_mpi_root:
logger.logkv(lossname, lossval)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update)
print('Saving to', savepath)
model.save(savepath)
return model
# Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error)
def safemean(xs):

View File

@@ -25,11 +25,10 @@ def test_microbatches():
env_test = DummyVecEnv([env_fn])
sess_test = make_session(make_default=True, graph=tf.Graph())
learn_fn(env=env_test, model_fn=partial(MicrobatchedModel, microbatch_size=2))
# learn_fn(env=env_test)
vars_test = {v.name: sess_test.run(v) for v in tf.trainable_variables()}
for v in vars_ref:
np.testing.assert_allclose(vars_ref[v], vars_test[v], atol=3e-3)
np.testing.assert_allclose(vars_ref[v], vars_test[v], atol=1e-3)
if __name__ == '__main__':
test_microbatches()

View File

@@ -1,5 +1,4 @@
import sys
import re
import multiprocessing
import os.path as osp
import gym
@@ -7,13 +6,15 @@ from collections import defaultdict
import tensorflow as tf
import numpy as np
from baselines.common.vec_env import VecFrameStack, VecNormalize, VecEnv
from baselines.common.vec_env.vec_video_recorder import VecVideoRecorder
from baselines.common.vec_env.vec_frame_stack import VecFrameStack
from baselines.common.cmd_util import common_arg_parser, parse_unknown_args, make_vec_env, make_env
from baselines.common.tf_util import get_session
from baselines import logger
from importlib import import_module
from baselines.common.vec_env.vec_normalize import VecNormalize
try:
from mpi4py import MPI
except ImportError:
@@ -51,7 +52,7 @@ _game_envs['retro'] = {
def train(args, extra_args):
env_type, env_id = get_env_type(args)
env_type, env_id = get_env_type(args.env)
print('env_type: {}'.format(env_type))
total_timesteps = int(args.num_timesteps)
@@ -63,7 +64,7 @@ def train(args, extra_args):
env = build_env(args)
if args.save_video_interval != 0:
env = VecVideoRecorder(env, osp.join(logger.get_dir(), "videos"), record_video_trigger=lambda x: x % args.save_video_interval == 0, video_length=args.save_video_length)
env = VecVideoRecorder(env, osp.join(logger.Logger.CURRENT.dir, "videos"), record_video_trigger=lambda x: x % args.save_video_interval == 0, video_length=args.save_video_length)
if args.network:
alg_kwargs['network'] = args.network
@@ -90,7 +91,7 @@ def build_env(args):
alg = args.alg
seed = args.seed
env_type, env_id = get_env_type(args)
env_type, env_id = get_env_type(args.env)
if env_type in {'atari', 'retro'}:
if alg == 'deepq':
@@ -103,27 +104,22 @@ def build_env(args):
env = VecFrameStack(env, frame_stack_size)
else:
config = tf.ConfigProto(allow_soft_placement=True,
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config)
config.gpu_options.allow_growth = True
get_session(config=config)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True)
if env_type == 'mujoco':
env = VecNormalize(env)
return env
def get_env_type(args):
env_id = args.env
if args.env_type is not None:
return args.env_type, env_id
def get_env_type(env_id):
# Re-parse the gym registry, since we could have new envs since last time.
for env in gym.envs.registry.all():
env_type = env._entry_point.split(':')[0].split('.')[-1]
@@ -138,8 +134,6 @@ def get_env_type(args):
if env_id in e:
env_type = g
break
if ':' in env_id:
env_type = re.sub(r':.*', '', env_id)
assert env_type is not None, 'env_id {} is not recognized in env types'.format(env_id, _game_envs.keys())
return env_type, env_id
@@ -200,6 +194,9 @@ def main(args):
args, unknown_args = arg_parser.parse_known_args(args)
extra_args = parse_cmdline_kwargs(unknown_args)
if args.extra_import is not None:
import_module(args.extra_import)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
rank = 0
logger.configure()
@@ -208,6 +205,7 @@ def main(args):
rank = MPI.COMM_WORLD.Get_rank()
model, env = train(args, extra_args)
env.close()
if args.save_path is not None and rank == 0:
save_path = osp.expanduser(args.save_path)
@@ -215,28 +213,26 @@ def main(args):
if args.play:
logger.log("Running trained model")
env = build_env(args)
obs = env.reset()
state = model.initial_state if hasattr(model, 'initial_state') else None
dones = np.zeros((1,))
episode_rew = 0
while True:
if state is not None:
actions, _, state, _ = model.step(obs,S=state, M=dones)
else:
actions, _, _, _ = model.step(obs)
obs, rew, done, _ = env.step(actions)
episode_rew += rew[0] if isinstance(env, VecEnv) else rew
obs, _, done, _ = env.step(actions)
env.render()
done = done.any() if isinstance(done, np.ndarray) else done
if done:
print('episode_rew={}'.format(episode_rew))
episode_rew = 0
obs = env.reset()
env.close()
env.close()
return model

View File

@@ -120,7 +120,7 @@
<td>114.26</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -152,7 +152,7 @@
<td>131.46</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -184,7 +184,7 @@
<td>113.58</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -216,7 +216,7 @@
<td>82.94</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -248,7 +248,7 @@
<td>81.61</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -280,7 +280,7 @@
<td>59.72</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>
@@ -312,7 +312,7 @@
<td>14.98</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
<td>cbd21ef</td>
</tr>

View File

@@ -12,9 +12,10 @@ extras = {
'filelock',
'pytest',
'pytest-forked',
'atari-py',
'matplotlib',
'pandas'
'atari-py'
],
'bullet': [
'pybullet',
],
'mpi': [
'mpi4py'
@@ -31,10 +32,12 @@ setup(name='baselines',
packages=[package for package in find_packages()
if package.startswith('baselines')],
install_requires=[
'gym>=0.10.0, <1.0.0',
'gym',
'scipy',
'tqdm',
'joblib',
'dill',
'progressbar2',
'cloudpickle',
'click',
'opencv-python'