From 6c44fb28fecdb666182e2951b7b1cbe9cf198ff1 Mon Sep 17 00:00:00 2001 From: pzhokhov Date: Wed, 19 Dec 2018 14:44:08 -0800 Subject: [PATCH] refactor HER - phase 1 (#767) * joshim5 changes (width and height to WarpFrame wrapper) * match network output with action distribution via a linear layer only if necessary (#167) * support color vs. grayscale option in WarpFrame wrapper (#166) * support color vs. grayscale option in WarpFrame wrapper * Support color in other wrappers * Updated per Peters suggestions * fixing test failures * ppo2 with microbatches (#168) * pass microbatch_size to the model during construction * microbatch fixes and test (#169) * microbatch fixes and test * tiny cleanup * added assertions to the test * vpg-related fix * Peterz joshim5 subclass ppo2 model (#170) * microbatch fixes and test * tiny cleanup * added assertions to the test * vpg-related fix * subclassing the model to make microbatched version of model WIP * made microbatched model a subclass of ppo2 Model * flake8 complaint * mpi-less ppo2 (resolving merge conflict) * flake8 and mpi4py imports in ppo2/model.py * more un-mpying * merge master * updates to the benchmark viewer code + autopep8 (#184) * viz docs and syntactic sugar wip * update viewer yaml to use persistent volume claims * move plot_util to baselines.common, update links * use 1Tb hard drive for results viewer * small updates to benchmark vizualizer code * autopep8 * autopep8 * any folder can be a benchmark * massage games image a little bit * fixed --preload option in app.py * remove preload from run_viewer.sh * remove pdb breakpoints * update bench-viewer.yaml * fixed bug (#185) * fixed bug it's wrong to do the else statement, because no other nodes would start. * changed the fix slightly * Refactor her phase 1 (#194) * add monitor to the rollout envs in her RUN BENCHMARKS her * Slice -> Slide in her benchmarks RUN BENCHMARKS her * run her benchmark for 200 epochs * dummy commit to RUN BENCHMARKS her * her benchmark for 500 epochs RUN BENCHMARKS her * add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her * add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her * add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her * disable saving of policies in her benchmark RUN BENCHMARKS her * run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch * run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch * launcher refactor wip * wip * her works on FetchReach * her runner refactor RUN BENCHMARKS Fetch1M * unit test for her * fixing warnings in mpi_average in her, skip test_fetchreach if mujoco is not present * pickle-based serialization in her * remove extra import from subproc_vec_env.py * investigating differences in rollout.py * try with old rollout code RUN BENCHMARKS her * temporarily use DummyVecEnv in cmd_util.py RUN BENCHMARKS her * dummy commit to RUN BENCHMARKS her * set info_values in rollout worker in her RUN BENCHMARKS her * bug in rollout_new.py RUN BENCHMARKS her * fixed bug in rollout_new.py RUN BENCHMARKS her * do not use last step because vecenv calls reset and returns obs after reset RUN BENCHMARKS her * updated buffer sizes RUN BENCHMARKS her * fixed loading/saving via joblib * dust off learning from demonstrations in HER, docs, refactor * add deprecation notice on her play and plot files * address comments by Matthias --- baselines/bench/benchmarks.py | 7 +- baselines/common/cmd_util.py | 17 +- baselines/common/plot_util.py | 5 +- baselines/common/tests/test_fetchreach.py | 39 +++ baselines/common/tests/util.py | 2 +- baselines/common/tf_util.py | 4 +- baselines/common/vec_env/dummy_vec_env.py | 1 + baselines/common/vec_env/shmem_vec_env.py | 1 + baselines/common/vec_env/subproc_vec_env.py | 19 +- baselines/her/README.md | 41 ++-- baselines/her/ddpg.py | 69 +++--- baselines/her/experiment/config.py | 29 ++- .../data_generation/fetch_data_generation.py | 31 +-- baselines/her/experiment/play.py | 1 + baselines/her/experiment/plot.py | 2 + baselines/her/experiment/train.py | 194 --------------- baselines/her/her.py | 228 ++++++++++++++---- baselines/her/her_sampler.py | 63 +++++ baselines/her/rollout.py | 66 ++--- baselines/run.py | 16 +- setup.cfg | 1 - 21 files changed, 454 insertions(+), 382 deletions(-) create mode 100644 baselines/common/tests/test_fetchreach.py delete mode 100644 baselines/her/experiment/train.py create mode 100644 baselines/her/her_sampler.py diff --git a/baselines/bench/benchmarks.py b/baselines/bench/benchmarks.py index c9fdd14..0d63e7a 100644 --- a/baselines/bench/benchmarks.py +++ b/baselines/bench/benchmarks.py @@ -156,9 +156,10 @@ register_benchmark({ # HER DDPG +_fetch_tasks = ['FetchReach-v1', 'FetchPush-v1', 'FetchSlide-v1'] register_benchmark({ - 'name': 'HerDdpg', - 'description': 'Smoke-test only benchmark of HER', - 'tasks': [{'trials': 1, 'env_id': 'FetchReach-v1'}] + 'name': 'Fetch1M', + 'description': 'Fetch* benchmarks for 1M timesteps', + 'tasks': [{'trials': 6, 'env_id': env_id, 'num_timesteps': int(1e6)} for env_id in _fetch_tasks] }) diff --git a/baselines/common/cmd_util.py b/baselines/common/cmd_util.py index 90b9868..650911e 100644 --- a/baselines/common/cmd_util.py +++ b/baselines/common/cmd_util.py @@ -18,11 +18,16 @@ from baselines.common.vec_env.subproc_vec_env import SubprocVecEnv from baselines.common.vec_env.dummy_vec_env import DummyVecEnv from baselines.common import retro_wrappers -def make_vec_env(env_id, env_type, num_env, seed, wrapper_kwargs=None, start_index=0, reward_scale=1.0, gamestate=None): +def make_vec_env(env_id, env_type, num_env, seed, + wrapper_kwargs=None, + start_index=0, + reward_scale=1.0, + flatten_dict_observations=True, + gamestate=None): """ Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo. """ - if wrapper_kwargs is None: wrapper_kwargs = {} + wrapper_kwargs = wrapper_kwargs or {} mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0 seed = seed + 10000 * mpi_rank if seed is not None else None def make_thunk(rank): @@ -33,6 +38,7 @@ def make_vec_env(env_id, env_type, num_env, seed, wrapper_kwargs=None, start_ind seed=seed, reward_scale=reward_scale, gamestate=gamestate, + flatten_dict_observations=flatten_dict_observations, wrapper_kwargs=wrapper_kwargs ) @@ -43,8 +49,9 @@ def make_vec_env(env_id, env_type, num_env, seed, wrapper_kwargs=None, start_ind return DummyVecEnv([make_thunk(start_index)]) -def make_env(env_id, env_type, subrank=0, seed=None, reward_scale=1.0, gamestate=None, wrapper_kwargs={}): +def make_env(env_id, env_type, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None): mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0 + wrapper_kwargs = wrapper_kwargs or {} if env_type == 'atari': env = make_atari(env_id) elif env_type == 'retro': @@ -54,6 +61,10 @@ def make_env(env_id, env_type, subrank=0, seed=None, reward_scale=1.0, gamestate else: env = gym.make(env_id) + if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict): + keys = env.observation_space.spaces.keys() + env = gym.wrappers.FlattenDictWrapper(env, dict_keys=list(keys)) + env.seed(seed + subrank if seed is not None else None) env = Monitor(env, logger.get_dir() and os.path.join(logger.get_dir(), str(mpi_rank) + '.' + str(subrank)), diff --git a/baselines/common/plot_util.py b/baselines/common/plot_util.py index 8009295..6a7d3d1 100644 --- a/baselines/common/plot_util.py +++ b/baselines/common/plot_util.py @@ -168,6 +168,7 @@ def load_results(root_dir_or_dirs, enable_progress=True, enable_monitor=True, ve - monitor - if enable_monitor is True, this field contains pandas dataframe with loaded monitor.csv file (or aggregate of all *.monitor.csv files in the directory) - progress - if enable_progress is True, this field contains pandas dataframe with loaded progress.csv file ''' + import re if isinstance(root_dir_or_dirs, str): rootdirs = [osp.expanduser(root_dir_or_dirs)] else: @@ -179,7 +180,9 @@ def load_results(root_dir_or_dirs, enable_progress=True, enable_monitor=True, ve if '-proc' in dirname: files[:] = [] continue - if set(['metadata.json', 'monitor.json', 'monitor.csv', 'progress.json', 'progress.csv']).intersection(files): + monitor_re = re.compile(r'(\d+\.)?(\d+\.)?monitor\.csv') + if set(['metadata.json', 'monitor.json', 'progress.json', 'progress.csv']).intersection(files) or \ + any([f for f in files if monitor_re.match(f)]): # also match monitor files like 0.1.monitor.csv # used to be uncommented, which means do not go deeper than current directory if any of the data files # are found # dirs[:] = [] diff --git a/baselines/common/tests/test_fetchreach.py b/baselines/common/tests/test_fetchreach.py new file mode 100644 index 0000000..be73663 --- /dev/null +++ b/baselines/common/tests/test_fetchreach.py @@ -0,0 +1,39 @@ +import pytest +import gym + +from baselines.run import get_learn_function +from baselines.common.tests.util import reward_per_episode_test + +pytest.importorskip('mujoco_py') + +common_kwargs = dict( + network='mlp', + seed=0, +) + +learn_kwargs = { + 'her': dict(total_timesteps=2000) +} + +@pytest.mark.slow +@pytest.mark.parametrize("alg", learn_kwargs.keys()) +def test_fetchreach(alg): + ''' + Test if the algorithm (with an mlp policy) + can learn the FetchReach task + ''' + + kwargs = common_kwargs.copy() + kwargs.update(learn_kwargs[alg]) + + learn_fn = lambda e: get_learn_function(alg)(env=e, **kwargs) + def env_fn(): + + env = gym.make('FetchReach-v1') + env.seed(0) + return env + + reward_per_episode_test(env_fn, learn_fn, -15) + +if __name__ == '__main__': + test_fetchreach('her') diff --git a/baselines/common/tests/util.py b/baselines/common/tests/util.py index 86a418e..51b9d0f 100644 --- a/baselines/common/tests/util.py +++ b/baselines/common/tests/util.py @@ -63,7 +63,7 @@ def rollout(env, model, n_trials): for i in range(n_trials): obs = env.reset() - state = model.initial_state + state = model.initial_state if hasattr(model, 'initial_state') else None episode_rew = [] episode_actions = [] episode_obs = [] diff --git a/baselines/common/tf_util.py b/baselines/common/tf_util.py index 8cc3df6..717b7dc 100644 --- a/baselines/common/tf_util.py +++ b/baselines/common/tf_util.py @@ -337,7 +337,7 @@ def save_state(fname, sess=None): def save_variables(save_path, variables=None, sess=None): sess = sess or get_session() - variables = variables or tf.trainable_variables() + variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) ps = sess.run(variables) save_dict = {v.name: value for v, value in zip(variables, ps)} @@ -348,7 +348,7 @@ def save_variables(save_path, variables=None, sess=None): def load_variables(load_path, variables=None, sess=None): sess = sess or get_session() - variables = variables or tf.trainable_variables() + variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) loaded_params = joblib.load(os.path.expanduser(load_path)) restores = [] diff --git a/baselines/common/vec_env/dummy_vec_env.py b/baselines/common/vec_env/dummy_vec_env.py index c2b86dd..339aa1e 100644 --- a/baselines/common/vec_env/dummy_vec_env.py +++ b/baselines/common/vec_env/dummy_vec_env.py @@ -27,6 +27,7 @@ class DummyVecEnv(VecEnv): self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32) self.buf_infos = [{} for _ in range(self.num_envs)] self.actions = None + self.specs = [e.spec for e in self.envs] def step_async(self, actions): listify = True diff --git a/baselines/common/vec_env/shmem_vec_env.py b/baselines/common/vec_env/shmem_vec_env.py index fcdcf47..99cc586 100644 --- a/baselines/common/vec_env/shmem_vec_env.py +++ b/baselines/common/vec_env/shmem_vec_env.py @@ -54,6 +54,7 @@ class ShmemVecEnv(VecEnv): proc.start() child_pipe.close() self.waiting_step = False + self.specs = [f().spec for f in env_fns] self.viewer = None def reset(self): diff --git a/baselines/common/vec_env/subproc_vec_env.py b/baselines/common/vec_env/subproc_vec_env.py index 4dc4d2c..b651e0d 100644 --- a/baselines/common/vec_env/subproc_vec_env.py +++ b/baselines/common/vec_env/subproc_vec_env.py @@ -57,6 +57,7 @@ class SubprocVecEnv(VecEnv): self.remotes[0].send(('get_spaces', None)) observation_space, action_space = self.remotes[0].recv() self.viewer = None + self.specs = [f().spec for f in env_fns] VecEnv.__init__(self, len(env_fns), observation_space, action_space) def step_async(self, actions): @@ -70,13 +71,13 @@ class SubprocVecEnv(VecEnv): results = [remote.recv() for remote in self.remotes] self.waiting = False obs, rews, dones, infos = zip(*results) - return np.stack(obs), np.stack(rews), np.stack(dones), infos + return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos def reset(self): self._assert_not_closed() for remote in self.remotes: remote.send(('reset', None)) - return np.stack([remote.recv() for remote in self.remotes]) + return _flatten_obs([remote.recv() for remote in self.remotes]) def close_extras(self): self.closed = True @@ -97,3 +98,17 @@ class SubprocVecEnv(VecEnv): def _assert_not_closed(self): assert not self.closed, "Trying to operate on a SubprocVecEnv after calling close()" + + +def _flatten_obs(obs): + assert isinstance(obs, list) or isinstance(obs, tuple) + assert len(obs) > 0 + + if isinstance(obs[0], dict): + import collections + assert isinstance(obs, collections.OrderedDict) + keys = obs[0].keys() + return {k: np.stack([o[k] for o in obs]) for k in keys} + else: + return np.stack(obs) + diff --git a/baselines/her/README.md b/baselines/her/README.md index 9934c69..232b505 100644 --- a/baselines/her/README.md +++ b/baselines/her/README.md @@ -6,26 +6,29 @@ For details on Hindsight Experience Replay (HER), please read the [paper](https: ### Getting started Training an agent is very simple: ```bash -python -m baselines.her.experiment.train +python -m baselines.run --alg=her --env=FetchReach-v1 --num_timesteps=5000 ``` This will train a DDPG+HER agent on the `FetchReach` environment. You should see the success rate go up quickly to `1.0`, which means that the agent achieves the -desired goal in 100% of the cases. -The training script logs other diagnostics as well and pickles the best policy so far (w.r.t. to its test success rate), -the latest policy, and, if enabled, a history of policies every K epochs. - -To inspect what the agent has learned, use the play script: +desired goal in 100% of the cases (note how HER can solve it in <5k steps - try doing that with PPO by replacing her with ppo2 :)) +The training script logs other diagnostics as well. Policy at the end of the training can be saved using `--save_path` flag, for instance: ```bash -python -m baselines.her.experiment.play /path/to/an/experiment/policy_best.pkl +python -m baselines.run --alg=her --env=FetchReach-v1 --num_timesteps=5000 --save_path=~/policies/her/fetchreach5k ``` -You can try it right now with the results of the training step (the script prints out the path for you). -This should visualize the current policy for 10 episodes and will also print statistics. + +To inspect what the agent has learned, use the `--play` flag: +```bash +python -m baselines.run --alg=her --env=FetchReach-v1 --num_timesteps=5000 --play +``` +(note `--play` can be combined with `--load_path`, which lets one load trained policies, for more results see [README.md](../../README.md)) ### Reproducing results -In order to reproduce the results from [Plappert et al. (2018)](https://arxiv.org/abs/1802.09464), run the following command: +In [Plappert et al. (2018)](https://arxiv.org/abs/1802.09464), 38 trajectories were generated in parallel +(19 MPI processes, each generating computing gradients from 2 trajectories and aggregating). +To reproduce that behaviour, use ```bash -python -m baselines.her.experiment.train --num_cpu 19 +mpirun -np 19 python -m baselines.run --num_env=2 --alg=her ... ``` This will require a machine with sufficient amount of physical CPU cores. In our experiments, we used [Azure's D15v2 instances](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/sizes), @@ -45,6 +48,13 @@ python experiment/data_generation/fetch_data_generation.py ``` This outputs ```data_fetch_random_100.npz``` file which is our data file. +To launch training with demonstrations (more technically, with behaviour cloning loss as an auxilliary loss), run the following +```bash +python -m baselines.run --alg=her --env=FetchPickAndPlace-v1 --num_timesteps=2.5e6 --demo_file=/Path/to/demo_file.npz +``` +This will train a DDPG+HER agent on the `FetchPickAndPlace` environment by using previously generated demonstration data. +To inspect what the agent has learned, use the `--play` flag as described above. + #### Configuration The provided configuration is for training an agent with HER without demonstrations, we need to change a few paramters for the HER algorithm to learn through demonstrations, to do that, set: @@ -62,13 +72,7 @@ Apart from these changes the reported results also have the following configurat * random_eps: 0.1 - percentage of time a random action is taken * noise_eps: 0.1 - std of gaussian noise added to not-completely-random actions -Now training an agent with pre-recorded demonstrations: -```bash -python -m baselines.her.experiment.train --env=FetchPickAndPlace-v0 --n_epochs=1000 --demo_file=/Path/to/demo_file.npz --num_cpu=1 -``` - -This will train a DDPG+HER agent on the `FetchPickAndPlace` environment by using previously generated demonstration data. -To inspect what the agent has learned, use the play script as described above. +These parameters can be changed either in [experiment/config.py](experiment/config.py) or passed to the command line as `--param=value`) ### Results Training with demonstrations helps overcome the exploration problem and achieves a faster and better convergence. The following graphs contrast the difference between training with and without demonstration data, We report the mean Q values vs Epoch and the Success Rate vs Epoch: @@ -78,3 +82,4 @@ Training with demonstrations helps overcome the exploration problem and achieves
Training results for Fetch Pick and Place task constrasting between training with and without demonstration data.
+ diff --git a/baselines/her/ddpg.py b/baselines/her/ddpg.py index 91e91f3..07317e5 100644 --- a/baselines/her/ddpg.py +++ b/baselines/her/ddpg.py @@ -10,13 +10,14 @@ from baselines.her.util import ( from baselines.her.normalizer import Normalizer from baselines.her.replay_buffer import ReplayBuffer from baselines.common.mpi_adam import MpiAdam +from baselines.common import tf_util def dims_to_shapes(input_dims): return {key: tuple([val]) if val > 0 else tuple() for key, val in input_dims.items()} -global demoBuffer #buffer for demonstrations +global DEMO_BUFFER #buffer for demonstrations class DDPG(object): @store_args @@ -94,16 +95,16 @@ class DDPG(object): self._create_network(reuse=reuse) # Configure the replay buffer. - buffer_shapes = {key: (self.T if key != 'o' else self.T+1, *input_shapes[key]) + buffer_shapes = {key: (self.T-1 if key != 'o' else self.T, *input_shapes[key]) for key, val in input_shapes.items()} buffer_shapes['g'] = (buffer_shapes['g'][0], self.dimg) - buffer_shapes['ag'] = (self.T+1, self.dimg) + buffer_shapes['ag'] = (self.T, self.dimg) buffer_size = (self.buffer_size // self.rollout_batch_size) * self.rollout_batch_size self.buffer = ReplayBuffer(buffer_shapes, buffer_size, self.T, self.sample_transitions) - global demoBuffer - demoBuffer = ReplayBuffer(buffer_shapes, buffer_size, self.T, self.sample_transitions) #initialize the demo buffer; in the same way as the primary data buffer + global DEMO_BUFFER + DEMO_BUFFER = ReplayBuffer(buffer_shapes, buffer_size, self.T, self.sample_transitions) #initialize the demo buffer; in the same way as the primary data buffer def _random_action(self, n): return np.random.uniform(low=-self.max_u, high=self.max_u, size=(n, self.dimu)) @@ -119,6 +120,11 @@ class DDPG(object): g = np.clip(g, -self.clip_obs, self.clip_obs) return o, g + def step(self, obs): + actions = self.get_actions(obs['observation'], obs['achieved_goal'], obs['desired_goal']) + return actions, None, None, None + + def get_actions(self, o, ag, g, noise_eps=0., random_eps=0., use_target_net=False, compute_Q=False): o, g = self._preprocess_og(o, ag, g) @@ -151,25 +157,30 @@ class DDPG(object): else: return ret - def initDemoBuffer(self, demoDataFile, update_stats=True): #function that initializes the demo buffer + def init_demo_buffer(self, demoDataFile, update_stats=True): #function that initializes the demo buffer demoData = np.load(demoDataFile) #load the demonstration data from data file info_keys = [key.replace('info_', '') for key in self.input_dims.keys() if key.startswith('info_')] - info_values = [np.empty((self.T, 1, self.input_dims['info_' + key]), np.float32) for key in info_keys] + info_values = [np.empty((self.T - 1, 1, self.input_dims['info_' + key]), np.float32) for key in info_keys] + + demo_data_obs = demoData['obs'] + demo_data_acs = demoData['acs'] + demo_data_info = demoData['info'] for epsd in range(self.num_demo): # we initialize the whole demo buffer at the start of the training obs, acts, goals, achieved_goals = [], [] ,[] ,[] i = 0 - for transition in range(self.T): - obs.append([demoData['obs'][epsd ][transition].get('observation')]) - acts.append([demoData['acs'][epsd][transition]]) - goals.append([demoData['obs'][epsd][transition].get('desired_goal')]) - achieved_goals.append([demoData['obs'][epsd][transition].get('achieved_goal')]) + for transition in range(self.T - 1): + obs.append([demo_data_obs[epsd][transition].get('observation')]) + acts.append([demo_data_acs[epsd][transition]]) + goals.append([demo_data_obs[epsd][transition].get('desired_goal')]) + achieved_goals.append([demo_data_obs[epsd][transition].get('achieved_goal')]) for idx, key in enumerate(info_keys): - info_values[idx][transition, i] = demoData['info'][epsd][transition][key] + info_values[idx][transition, i] = demo_data_info[epsd][transition][key] - obs.append([demoData['obs'][epsd][self.T].get('observation')]) - achieved_goals.append([demoData['obs'][epsd][self.T].get('achieved_goal')]) + + obs.append([demo_data_obs[epsd][self.T - 1].get('observation')]) + achieved_goals.append([demo_data_obs[epsd][self.T - 1].get('achieved_goal')]) episode = dict(o=obs, u=acts, @@ -179,10 +190,9 @@ class DDPG(object): episode['info_{}'.format(key)] = value episode = convert_episode_to_batch_major(episode) - global demoBuffer - demoBuffer.store_episode(episode) # create the observation dict and append them into the demonstration buffer - - print("Demo buffer size currently ", demoBuffer.get_current_size()) #print out the demonstration buffer size + global DEMO_BUFFER + DEMO_BUFFER.store_episode(episode) # create the observation dict and append them into the demonstration buffer + logger.debug("Demo buffer size currently ", DEMO_BUFFER.get_current_size()) #print out the demonstration buffer size if update_stats: # add transitions to normalizer to normalize the demo data as well @@ -191,7 +201,7 @@ class DDPG(object): num_normalizing_transitions = transitions_in_episode_batch(episode) transitions = self.sample_transitions(episode, num_normalizing_transitions) - o, o_2, g, ag = transitions['o'], transitions['o_2'], transitions['g'], transitions['ag'] + o, g, ag = transitions['o'], transitions['g'], transitions['ag'] transitions['o'], transitions['g'] = self._preprocess_og(o, ag, g) # No need to preprocess the o_2 and g_2 since this is only used for stats @@ -202,6 +212,8 @@ class DDPG(object): self.g_stats.recompute_stats() episode.clear() + logger.info("Demo buffer size: ", DEMO_BUFFER.get_current_size()) #print out the demonstration buffer size + def store_episode(self, episode_batch, update_stats=True): """ episode_batch: array of batch_size x (T or T+1) x dim_key @@ -217,7 +229,7 @@ class DDPG(object): num_normalizing_transitions = transitions_in_episode_batch(episode_batch) transitions = self.sample_transitions(episode_batch, num_normalizing_transitions) - o, o_2, g, ag = transitions['o'], transitions['o_2'], transitions['g'], transitions['ag'] + o, g, ag = transitions['o'], transitions['g'], transitions['ag'] transitions['o'], transitions['g'] = self._preprocess_og(o, ag, g) # No need to preprocess the o_2 and g_2 since this is only used for stats @@ -251,9 +263,9 @@ class DDPG(object): def sample_batch(self): if self.bc_loss: #use demonstration buffer to sample as well if bc_loss flag is set TRUE transitions = self.buffer.sample(self.batch_size - self.demo_batch_size) - global demoBuffer - transitionsDemo = demoBuffer.sample(self.demo_batch_size) #sample from the demo buffer - for k, values in transitionsDemo.items(): + global DEMO_BUFFER + transitions_demo = DEMO_BUFFER.sample(self.demo_batch_size) #sample from the demo buffer + for k, values in transitions_demo.items(): rolloutV = transitions[k].tolist() for v in values: rolloutV.append(v.tolist()) @@ -302,10 +314,7 @@ class DDPG(object): def _create_network(self, reuse=False): logger.info("Creating a DDPG agent with action space %d x %s..." % (self.dimu, self.max_u)) - - self.sess = tf.get_default_session() - if self.sess is None: - self.sess = tf.InteractiveSession() + self.sess = tf_util.get_session() # running averages with tf.variable_scope('o_stats') as vs: @@ -433,3 +442,7 @@ class DDPG(object): assert(len(vars) == len(state["tf"])) node = [tf.assign(var, val) for var, val in zip(vars, state["tf"])] self.sess.run(node) + + def save(self, save_path): + tf_util.save_variables(save_path) + diff --git a/baselines/her/experiment/config.py b/baselines/her/experiment/config.py index 8cc36e6..6370505 100644 --- a/baselines/her/experiment/config.py +++ b/baselines/her/experiment/config.py @@ -1,10 +1,11 @@ +import os import numpy as np import gym from baselines import logger from baselines.her.ddpg import DDPG -from baselines.her.her import make_sample_her_transitions - +from baselines.her.her_sampler import make_sample_her_transitions +from baselines.bench.monitor import Monitor DEFAULT_ENV_PARAMS = { 'FetchReach-v1': { @@ -72,16 +73,32 @@ def cached_make_env(make_env): def prepare_params(kwargs): # DDPG params ddpg_params = dict() - env_name = kwargs['env_name'] - def make_env(): - return gym.make(env_name) + def make_env(subrank=None): + env = gym.make(env_name) + if subrank is not None and logger.get_dir() is not None: + try: + from mpi4py import MPI + mpi_rank = MPI.COMM_WORLD.Get_rank() + except ImportError: + MPI = None + mpi_rank = 0 + logger.warn('Running with a single MPI process. This should work, but the results may differ from the ones publshed in Plappert et al.') + + max_episode_steps = env._max_episode_steps + env = Monitor(env, + os.path.join(logger.get_dir(), str(mpi_rank) + '.' + str(subrank)), + allow_early_resets=True) + # hack to re-expose _max_episode_steps (ideally should replace reliance on it downstream) + env = gym.wrappers.TimeLimit(env, max_episode_steps=max_episode_steps) + return env + kwargs['make_env'] = make_env tmp_env = cached_make_env(kwargs['make_env']) assert hasattr(tmp_env, '_max_episode_steps') kwargs['T'] = tmp_env._max_episode_steps - tmp_env.reset() + kwargs['max_u'] = np.array(kwargs['max_u']) if isinstance(kwargs['max_u'], list) else kwargs['max_u'] kwargs['gamma'] = 1. - 1. / kwargs['T'] if 'lr' in kwargs: diff --git a/baselines/her/experiment/data_generation/fetch_data_generation.py b/baselines/her/experiment/data_generation/fetch_data_generation.py index eecd516..0a0a755 100644 --- a/baselines/her/experiment/data_generation/fetch_data_generation.py +++ b/baselines/her/experiment/data_generation/fetch_data_generation.py @@ -1,18 +1,5 @@ import gym -import time -import random import numpy as np -import rospy -import roslaunch - -from random import randint -from std_srvs.srv import Empty -from sensor_msgs.msg import JointState -from geometry_msgs.msg import PoseStamped -from geometry_msgs.msg import Pose -from std_msgs.msg import Float64 -from controller_manager_msgs.srv import SwitchController -from gym.utils import seeding """Data generation for the case of a single block pick and place in Fetch Env""" @@ -22,7 +9,7 @@ observations = [] infos = [] def main(): - env = gym.make('FetchPickAndPlace-v0') + env = gym.make('FetchPickAndPlace-v1') numItr = 100 initStateSpace = "random" env.reset() @@ -31,21 +18,19 @@ def main(): obs = env.reset() print("ITERATION NUMBER ", len(actions)) goToGoal(env, obs) - + fileName = "data_fetch" fileName += "_" + initStateSpace fileName += "_" + str(numItr) fileName += ".npz" - + np.savez_compressed(fileName, acs=actions, obs=observations, info=infos) # save the file def goToGoal(env, lastObs): goal = lastObs['desired_goal'] objectPos = lastObs['observation'][3:6] - gripperPos = lastObs['observation'][:3] - gripperState = lastObs['observation'][9:11] object_rel_pos = lastObs['observation'][6:9] episodeAcs = [] episodeObs = [] @@ -53,7 +38,7 @@ def goToGoal(env, lastObs): object_oriented_goal = object_rel_pos.copy() object_oriented_goal[2] += 0.03 # first make the gripper go slightly above the object - + timeStep = 0 #count the total number of timesteps episodeObs.append(lastObs) @@ -76,8 +61,6 @@ def goToGoal(env, lastObs): episodeObs.append(obsDataNew) objectPos = obsDataNew['observation'][3:6] - gripperPos = obsDataNew['observation'][:3] - gripperState = obsDataNew['observation'][9:11] object_rel_pos = obsDataNew['observation'][6:9] while np.linalg.norm(object_rel_pos) >= 0.005 and timeStep <= env._max_episode_steps : @@ -96,8 +79,6 @@ def goToGoal(env, lastObs): episodeObs.append(obsDataNew) objectPos = obsDataNew['observation'][3:6] - gripperPos = obsDataNew['observation'][:3] - gripperState = obsDataNew['observation'][9:11] object_rel_pos = obsDataNew['observation'][6:9] @@ -117,8 +98,6 @@ def goToGoal(env, lastObs): episodeObs.append(obsDataNew) objectPos = obsDataNew['observation'][3:6] - gripperPos = obsDataNew['observation'][:3] - gripperState = obsDataNew['observation'][9:11] object_rel_pos = obsDataNew['observation'][6:9] while True: #limit the number of timesteps in the episode to a fixed duration @@ -134,8 +113,6 @@ def goToGoal(env, lastObs): episodeObs.append(obsDataNew) objectPos = obsDataNew['observation'][3:6] - gripperPos = obsDataNew['observation'][:3] - gripperState = obsDataNew['observation'][9:11] object_rel_pos = obsDataNew['observation'][6:9] if timeStep >= env._max_episode_steps: break diff --git a/baselines/her/experiment/play.py b/baselines/her/experiment/play.py index a6f94e9..8989c92 100644 --- a/baselines/her/experiment/play.py +++ b/baselines/her/experiment/play.py @@ -1,3 +1,4 @@ +# DEPRECATED, use --play flag to baselines.run instead import click import numpy as np import pickle diff --git a/baselines/her/experiment/plot.py b/baselines/her/experiment/plot.py index a14872d..542de0e 100644 --- a/baselines/her/experiment/plot.py +++ b/baselines/her/experiment/plot.py @@ -1,3 +1,5 @@ +# DEPRECATED, use baselines.common.plot_util instead + import os import matplotlib.pyplot as plt import numpy as np diff --git a/baselines/her/experiment/train.py b/baselines/her/experiment/train.py deleted file mode 100644 index 82a11f0..0000000 --- a/baselines/her/experiment/train.py +++ /dev/null @@ -1,194 +0,0 @@ -import os -import sys - -import click -import numpy as np -import json -from mpi4py import MPI - -from baselines import logger -from baselines.common import set_global_seeds -from baselines.common.mpi_moments import mpi_moments -import baselines.her.experiment.config as config -from baselines.her.rollout import RolloutWorker -from baselines.her.util import mpi_fork - -from subprocess import CalledProcessError - - -def mpi_average(value): - if value == []: - value = [0.] - if not isinstance(value, list): - value = [value] - return mpi_moments(np.array(value))[0] - - -def train(policy, rollout_worker, evaluator, - n_epochs, n_test_rollouts, n_cycles, n_batches, policy_save_interval, - save_policies, demo_file, **kwargs): - rank = MPI.COMM_WORLD.Get_rank() - - latest_policy_path = os.path.join(logger.get_dir(), 'policy_latest.pkl') - best_policy_path = os.path.join(logger.get_dir(), 'policy_best.pkl') - periodic_policy_path = os.path.join(logger.get_dir(), 'policy_{}.pkl') - - logger.info("Training...") - best_success_rate = -1 - - if policy.bc_loss == 1: policy.initDemoBuffer(demo_file) #initialize demo buffer if training with demonstrations - for epoch in range(n_epochs): - # train - rollout_worker.clear_history() - for _ in range(n_cycles): - episode = rollout_worker.generate_rollouts() - policy.store_episode(episode) - for _ in range(n_batches): - policy.train() - policy.update_target_net() - - # test - evaluator.clear_history() - for _ in range(n_test_rollouts): - evaluator.generate_rollouts() - - # record logs - logger.record_tabular('epoch', epoch) - for key, val in evaluator.logs('test'): - logger.record_tabular(key, mpi_average(val)) - for key, val in rollout_worker.logs('train'): - logger.record_tabular(key, mpi_average(val)) - for key, val in policy.logs(): - logger.record_tabular(key, mpi_average(val)) - - if rank == 0: - logger.dump_tabular() - - # save the policy if it's better than the previous ones - success_rate = mpi_average(evaluator.current_success_rate()) - if rank == 0 and success_rate >= best_success_rate and save_policies: - best_success_rate = success_rate - logger.info('New best success rate: {}. Saving policy to {} ...'.format(best_success_rate, best_policy_path)) - evaluator.save_policy(best_policy_path) - evaluator.save_policy(latest_policy_path) - if rank == 0 and policy_save_interval > 0 and epoch % policy_save_interval == 0 and save_policies: - policy_path = periodic_policy_path.format(epoch) - logger.info('Saving periodic policy to {} ...'.format(policy_path)) - evaluator.save_policy(policy_path) - - # make sure that different threads have different seeds - local_uniform = np.random.uniform(size=(1,)) - root_uniform = local_uniform.copy() - MPI.COMM_WORLD.Bcast(root_uniform, root=0) - if rank != 0: - assert local_uniform[0] != root_uniform[0] - - -def launch( - env, logdir, n_epochs, num_cpu, seed, replay_strategy, policy_save_interval, clip_return, - demo_file, override_params={}, save_policies=True -): - # Fork for multi-CPU MPI implementation. - if num_cpu > 1: - try: - whoami = mpi_fork(num_cpu, ['--bind-to', 'core']) - except CalledProcessError: - # fancy version of mpi call failed, try simple version - whoami = mpi_fork(num_cpu) - - if whoami == 'parent': - sys.exit(0) - import baselines.common.tf_util as U - U.single_threaded_session().__enter__() - rank = MPI.COMM_WORLD.Get_rank() - - # Configure logging - if rank == 0: - if logdir or logger.get_dir() is None: - logger.configure(dir=logdir) - else: - logger.configure() - logdir = logger.get_dir() - assert logdir is not None - os.makedirs(logdir, exist_ok=True) - - # Seed everything. - rank_seed = seed + 1000000 * rank - set_global_seeds(rank_seed) - - # Prepare params. - params = config.DEFAULT_PARAMS - params['env_name'] = env - params['replay_strategy'] = replay_strategy - if env in config.DEFAULT_ENV_PARAMS: - params.update(config.DEFAULT_ENV_PARAMS[env]) # merge env-specific parameters in - params.update(**override_params) # makes it possible to override any parameter - with open(os.path.join(logger.get_dir(), 'params.json'), 'w') as f: - json.dump(params, f) - params = config.prepare_params(params) - config.log_params(params, logger=logger) - - if num_cpu == 1: - logger.warn() - logger.warn('*** Warning ***') - logger.warn( - 'You are running HER with just a single MPI worker. This will work, but the ' + - 'experiments that we report in Plappert et al. (2018, https://arxiv.org/abs/1802.09464) ' + - 'were obtained with --num_cpu 19. This makes a significant difference and if you ' + - 'are looking to reproduce those results, be aware of this. Please also refer to ' + - 'https://github.com/openai/baselines/issues/314 for further details.') - logger.warn('****************') - logger.warn() - - dims = config.configure_dims(params) - policy = config.configure_ddpg(dims=dims, params=params, clip_return=clip_return) - - rollout_params = { - 'exploit': False, - 'use_target_net': False, - 'use_demo_states': True, - 'compute_Q': False, - 'T': params['T'], - } - - eval_params = { - 'exploit': True, - 'use_target_net': params['test_with_polyak'], - 'use_demo_states': False, - 'compute_Q': True, - 'T': params['T'], - } - - for name in ['T', 'rollout_batch_size', 'gamma', 'noise_eps', 'random_eps']: - rollout_params[name] = params[name] - eval_params[name] = params[name] - - rollout_worker = RolloutWorker(params['make_env'], policy, dims, logger, **rollout_params) - rollout_worker.seed(rank_seed) - - evaluator = RolloutWorker(params['make_env'], policy, dims, logger, **eval_params) - evaluator.seed(rank_seed) - - train( - logdir=logdir, policy=policy, rollout_worker=rollout_worker, - evaluator=evaluator, n_epochs=n_epochs, n_test_rollouts=params['n_test_rollouts'], - n_cycles=params['n_cycles'], n_batches=params['n_batches'], - policy_save_interval=policy_save_interval, save_policies=save_policies, demo_file=demo_file) - - -@click.command() -@click.option('--env', type=str, default='FetchReach-v1', help='the name of the OpenAI Gym environment that you want to train on') -@click.option('--logdir', type=str, default=None, help='the path to where logs and policy pickles should go. If not specified, creates a folder in /tmp/') -@click.option('--n_epochs', type=int, default=50, help='the number of training epochs to run') -@click.option('--num_cpu', type=int, default=1, help='the number of CPU cores to use (using MPI)') -@click.option('--seed', type=int, default=0, help='the random seed used to seed both the environment and the training code') -@click.option('--policy_save_interval', type=int, default=5, help='the interval with which policy pickles are saved. If set to 0, only the best and latest policy will be pickled.') -@click.option('--replay_strategy', type=click.Choice(['future', 'none']), default='future', help='the HER replay strategy to be used. "future" uses HER, "none" disables HER.') -@click.option('--clip_return', type=int, default=1, help='whether or not returns should be clipped') -@click.option('--demo_file', type=str, default = 'PATH/TO/DEMO/DATA/FILE.npz', help='demo data file path') -def main(**kwargs): - launch(**kwargs) - - -if __name__ == '__main__': - main() diff --git a/baselines/her/her.py b/baselines/her/her.py index 76f3c34..ec96f9e 100644 --- a/baselines/her/her.py +++ b/baselines/her/her.py @@ -1,63 +1,193 @@ +import os + +import click import numpy as np +import json +from mpi4py import MPI + +from baselines import logger +from baselines.common import set_global_seeds, tf_util +from baselines.common.mpi_moments import mpi_moments +import baselines.her.experiment.config as config +from baselines.her.rollout import RolloutWorker + +def mpi_average(value): + if not isinstance(value, list): + value = [value] + if not any(value): + value = [0.] + return mpi_moments(np.array(value))[0] -def make_sample_her_transitions(replay_strategy, replay_k, reward_fun): - """Creates a sample function that can be used for HER experience replay. +def train(*, policy, rollout_worker, evaluator, + n_epochs, n_test_rollouts, n_cycles, n_batches, policy_save_interval, + save_path, demo_file, **kwargs): + rank = MPI.COMM_WORLD.Get_rank() - Args: - replay_strategy (in ['future', 'none']): the HER replay strategy; if set to 'none', - regular DDPG experience replay is used - replay_k (int): the ratio between HER replays and regular replays (e.g. k = 4 -> 4 times - as many HER replays as regular replays are used) - reward_fun (function): function to re-compute the reward with substituted goals - """ - if replay_strategy == 'future': - future_p = 1 - (1. / (1 + replay_k)) - else: # 'replay_strategy' == 'none' - future_p = 0 + if save_path: + latest_policy_path = os.path.join(save_path, 'policy_latest.pkl') + best_policy_path = os.path.join(save_path, 'policy_best.pkl') + periodic_policy_path = os.path.join(save_path, 'policy_{}.pkl') - def _sample_her_transitions(episode_batch, batch_size_in_transitions): - """episode_batch is {key: array(buffer_size x T x dim_key)} - """ - T = episode_batch['u'].shape[1] - rollout_batch_size = episode_batch['u'].shape[0] - batch_size = batch_size_in_transitions + logger.info("Training...") + best_success_rate = -1 - # Select which episodes and time steps to use. - episode_idxs = np.random.randint(0, rollout_batch_size, batch_size) - t_samples = np.random.randint(T, size=batch_size) - transitions = {key: episode_batch[key][episode_idxs, t_samples].copy() - for key in episode_batch.keys()} + if policy.bc_loss == 1: policy.init_demo_buffer(demo_file) #initialize demo buffer if training with demonstrations - # Select future time indexes proportional with probability future_p. These - # will be used for HER replay by substituting in future goals. - her_indexes = np.where(np.random.uniform(size=batch_size) < future_p) - future_offset = np.random.uniform(size=batch_size) * (T - t_samples) - future_offset = future_offset.astype(int) - future_t = (t_samples + 1 + future_offset)[her_indexes] + # num_timesteps = n_epochs * n_cycles * rollout_length * number of rollout workers + for epoch in range(n_epochs): + # train + rollout_worker.clear_history() + for _ in range(n_cycles): + episode = rollout_worker.generate_rollouts() + policy.store_episode(episode) + for _ in range(n_batches): + policy.train() + policy.update_target_net() - # Replace goal with achieved goal but only for the previously-selected - # HER transitions (as defined by her_indexes). For the other transitions, - # keep the original goal. - future_ag = episode_batch['ag'][episode_idxs[her_indexes], future_t] - transitions['g'][her_indexes] = future_ag + # test + evaluator.clear_history() + for _ in range(n_test_rollouts): + evaluator.generate_rollouts() - # Reconstruct info dictionary for reward computation. - info = {} - for key, value in transitions.items(): - if key.startswith('info_'): - info[key.replace('info_', '')] = value + # record logs + logger.record_tabular('epoch', epoch) + for key, val in evaluator.logs('test'): + logger.record_tabular(key, mpi_average(val)) + for key, val in rollout_worker.logs('train'): + logger.record_tabular(key, mpi_average(val)) + for key, val in policy.logs(): + logger.record_tabular(key, mpi_average(val)) - # Re-compute reward since we may have substituted the goal. - reward_params = {k: transitions[k] for k in ['ag_2', 'g']} - reward_params['info'] = info - transitions['r'] = reward_fun(**reward_params) + if rank == 0: + logger.dump_tabular() - transitions = {k: transitions[k].reshape(batch_size, *transitions[k].shape[1:]) - for k in transitions.keys()} + # save the policy if it's better than the previous ones + success_rate = mpi_average(evaluator.current_success_rate()) + if rank == 0 and success_rate >= best_success_rate and save_path: + best_success_rate = success_rate + logger.info('New best success rate: {}. Saving policy to {} ...'.format(best_success_rate, best_policy_path)) + evaluator.save_policy(best_policy_path) + evaluator.save_policy(latest_policy_path) + if rank == 0 and policy_save_interval > 0 and epoch % policy_save_interval == 0 and save_path: + policy_path = periodic_policy_path.format(epoch) + logger.info('Saving periodic policy to {} ...'.format(policy_path)) + evaluator.save_policy(policy_path) - assert(transitions['u'].shape[0] == batch_size_in_transitions) + # make sure that different threads have different seeds + local_uniform = np.random.uniform(size=(1,)) + root_uniform = local_uniform.copy() + MPI.COMM_WORLD.Bcast(root_uniform, root=0) + if rank != 0: + assert local_uniform[0] != root_uniform[0] - return transitions + return policy - return _sample_her_transitions + +def learn(*, network, env, total_timesteps, + seed=None, + eval_env=None, + replay_strategy='future', + policy_save_interval=5, + clip_return=True, + demo_file=None, + override_params=None, + load_path=None, + save_path=None, + **kwargs +): + + override_params = override_params or {} + if MPI is not None: + rank = MPI.COMM_WORLD.Get_rank() + num_cpu = MPI.COMM_WORLD.Get_size() + + # Seed everything. + rank_seed = seed + 1000000 * rank if seed is not None else None + set_global_seeds(rank_seed) + + # Prepare params. + params = config.DEFAULT_PARAMS + env_name = env.specs[0].id + params['env_name'] = env_name + params['replay_strategy'] = replay_strategy + if env_name in config.DEFAULT_ENV_PARAMS: + params.update(config.DEFAULT_ENV_PARAMS[env_name]) # merge env-specific parameters in + params.update(**override_params) # makes it possible to override any parameter + with open(os.path.join(logger.get_dir(), 'params.json'), 'w') as f: + json.dump(params, f) + params = config.prepare_params(params) + params['rollout_batch_size'] = env.num_envs + + if demo_file is not None: + params['bc_loss'] = 1 + params.update(kwargs) + + config.log_params(params, logger=logger) + + if num_cpu == 1: + logger.warn() + logger.warn('*** Warning ***') + logger.warn( + 'You are running HER with just a single MPI worker. This will work, but the ' + + 'experiments that we report in Plappert et al. (2018, https://arxiv.org/abs/1802.09464) ' + + 'were obtained with --num_cpu 19. This makes a significant difference and if you ' + + 'are looking to reproduce those results, be aware of this. Please also refer to ' + + 'https://github.com/openai/baselines/issues/314 for further details.') + logger.warn('****************') + logger.warn() + + dims = config.configure_dims(params) + policy = config.configure_ddpg(dims=dims, params=params, clip_return=clip_return) + if load_path is not None: + tf_util.load_variables(load_path) + + rollout_params = { + 'exploit': False, + 'use_target_net': False, + 'use_demo_states': True, + 'compute_Q': False, + 'T': params['T'], + } + + eval_params = { + 'exploit': True, + 'use_target_net': params['test_with_polyak'], + 'use_demo_states': False, + 'compute_Q': True, + 'T': params['T'], + } + + for name in ['T', 'rollout_batch_size', 'gamma', 'noise_eps', 'random_eps']: + rollout_params[name] = params[name] + eval_params[name] = params[name] + + eval_env = eval_env or env + + rollout_worker = RolloutWorker(env, policy, dims, logger, monitor=True, **rollout_params) + evaluator = RolloutWorker(eval_env, policy, dims, logger, **eval_params) + + n_cycles = params['n_cycles'] + n_epochs = total_timesteps // n_cycles // rollout_worker.T // rollout_worker.rollout_batch_size + + return train( + save_path=save_path, policy=policy, rollout_worker=rollout_worker, + evaluator=evaluator, n_epochs=n_epochs, n_test_rollouts=params['n_test_rollouts'], + n_cycles=params['n_cycles'], n_batches=params['n_batches'], + policy_save_interval=policy_save_interval, demo_file=demo_file) + + +@click.command() +@click.option('--env', type=str, default='FetchReach-v1', help='the name of the OpenAI Gym environment that you want to train on') +@click.option('--total_timesteps', type=int, default=int(5e5), help='the number of timesteps to run') +@click.option('--seed', type=int, default=0, help='the random seed used to seed both the environment and the training code') +@click.option('--policy_save_interval', type=int, default=5, help='the interval with which policy pickles are saved. If set to 0, only the best and latest policy will be pickled.') +@click.option('--replay_strategy', type=click.Choice(['future', 'none']), default='future', help='the HER replay strategy to be used. "future" uses HER, "none" disables HER.') +@click.option('--clip_return', type=int, default=1, help='whether or not returns should be clipped') +@click.option('--demo_file', type=str, default = 'PATH/TO/DEMO/DATA/FILE.npz', help='demo data file path') +def main(**kwargs): + learn(**kwargs) + + +if __name__ == '__main__': + main() diff --git a/baselines/her/her_sampler.py b/baselines/her/her_sampler.py new file mode 100644 index 0000000..76f3c34 --- /dev/null +++ b/baselines/her/her_sampler.py @@ -0,0 +1,63 @@ +import numpy as np + + +def make_sample_her_transitions(replay_strategy, replay_k, reward_fun): + """Creates a sample function that can be used for HER experience replay. + + Args: + replay_strategy (in ['future', 'none']): the HER replay strategy; if set to 'none', + regular DDPG experience replay is used + replay_k (int): the ratio between HER replays and regular replays (e.g. k = 4 -> 4 times + as many HER replays as regular replays are used) + reward_fun (function): function to re-compute the reward with substituted goals + """ + if replay_strategy == 'future': + future_p = 1 - (1. / (1 + replay_k)) + else: # 'replay_strategy' == 'none' + future_p = 0 + + def _sample_her_transitions(episode_batch, batch_size_in_transitions): + """episode_batch is {key: array(buffer_size x T x dim_key)} + """ + T = episode_batch['u'].shape[1] + rollout_batch_size = episode_batch['u'].shape[0] + batch_size = batch_size_in_transitions + + # Select which episodes and time steps to use. + episode_idxs = np.random.randint(0, rollout_batch_size, batch_size) + t_samples = np.random.randint(T, size=batch_size) + transitions = {key: episode_batch[key][episode_idxs, t_samples].copy() + for key in episode_batch.keys()} + + # Select future time indexes proportional with probability future_p. These + # will be used for HER replay by substituting in future goals. + her_indexes = np.where(np.random.uniform(size=batch_size) < future_p) + future_offset = np.random.uniform(size=batch_size) * (T - t_samples) + future_offset = future_offset.astype(int) + future_t = (t_samples + 1 + future_offset)[her_indexes] + + # Replace goal with achieved goal but only for the previously-selected + # HER transitions (as defined by her_indexes). For the other transitions, + # keep the original goal. + future_ag = episode_batch['ag'][episode_idxs[her_indexes], future_t] + transitions['g'][her_indexes] = future_ag + + # Reconstruct info dictionary for reward computation. + info = {} + for key, value in transitions.items(): + if key.startswith('info_'): + info[key.replace('info_', '')] = value + + # Re-compute reward since we may have substituted the goal. + reward_params = {k: transitions[k] for k in ['ag_2', 'g']} + reward_params['info'] = info + transitions['r'] = reward_fun(**reward_params) + + transitions = {k: transitions[k].reshape(batch_size, *transitions[k].shape[1:]) + for k in transitions.keys()} + + assert(transitions['u'].shape[0] == batch_size_in_transitions) + + return transitions + + return _sample_her_transitions diff --git a/baselines/her/rollout.py b/baselines/her/rollout.py index e33b92a..4ffeee5 100644 --- a/baselines/her/rollout.py +++ b/baselines/her/rollout.py @@ -2,7 +2,6 @@ from collections import deque import numpy as np import pickle -from mujoco_py import MujocoException from baselines.her.util import convert_episode_to_batch_major, store_args @@ -10,9 +9,9 @@ from baselines.her.util import convert_episode_to_batch_major, store_args class RolloutWorker: @store_args - def __init__(self, make_env, policy, dims, logger, T, rollout_batch_size=1, + def __init__(self, venv, policy, dims, logger, T, rollout_batch_size=1, exploit=False, use_target_net=False, compute_Q=False, noise_eps=0, - random_eps=0, history_len=100, render=False, **kwargs): + random_eps=0, history_len=100, render=False, monitor=False, **kwargs): """Rollout worker generates experience by interacting with one or many environments. Args: @@ -31,7 +30,7 @@ class RolloutWorker: history_len (int): length of history for statistics smoothing render (boolean): whether or not to render the rollouts """ - self.envs = [make_env() for _ in range(rollout_batch_size)] + assert self.T > 0 self.info_keys = [key.replace('info_', '') for key in dims.keys() if key.startswith('info_')] @@ -40,26 +39,14 @@ class RolloutWorker: self.Q_history = deque(maxlen=history_len) self.n_episodes = 0 - self.g = np.empty((self.rollout_batch_size, self.dims['g']), np.float32) # goals - self.initial_o = np.empty((self.rollout_batch_size, self.dims['o']), np.float32) # observations - self.initial_ag = np.empty((self.rollout_batch_size, self.dims['g']), np.float32) # achieved goals self.reset_all_rollouts() self.clear_history() - def reset_rollout(self, i): - """Resets the `i`-th rollout environment, re-samples a new goal, and updates the `initial_o` - and `g` arrays accordingly. - """ - obs = self.envs[i].reset() - self.initial_o[i] = obs['observation'] - self.initial_ag[i] = obs['achieved_goal'] - self.g[i] = obs['desired_goal'] - def reset_all_rollouts(self): - """Resets all `rollout_batch_size` rollout workers. - """ - for i in range(self.rollout_batch_size): - self.reset_rollout(i) + self.obs_dict = self.venv.reset() + self.initial_o = self.obs_dict['observation'] + self.initial_ag = self.obs_dict['achieved_goal'] + self.g = self.obs_dict['desired_goal'] def generate_rollouts(self): """Performs `rollout_batch_size` rollouts in parallel for time horizon `T` with the current @@ -75,7 +62,8 @@ class RolloutWorker: # generate episodes obs, achieved_goals, acts, goals, successes = [], [], [], [], [] - info_values = [np.empty((self.T, self.rollout_batch_size, self.dims['info_' + key]), np.float32) for key in self.info_keys] + dones = [] + info_values = [np.empty((self.T - 1, self.rollout_batch_size, self.dims['info_' + key]), np.float32) for key in self.info_keys] Qs = [] for t in range(self.T): policy_output = self.policy.get_actions( @@ -99,27 +87,27 @@ class RolloutWorker: ag_new = np.empty((self.rollout_batch_size, self.dims['g'])) success = np.zeros(self.rollout_batch_size) # compute new states and observations - for i in range(self.rollout_batch_size): - try: - # We fully ignore the reward here because it will have to be re-computed - # for HER. - curr_o_new, _, _, info = self.envs[i].step(u[i]) - if 'is_success' in info: - success[i] = info['is_success'] - o_new[i] = curr_o_new['observation'] - ag_new[i] = curr_o_new['achieved_goal'] - for idx, key in enumerate(self.info_keys): - info_values[idx][t, i] = info[key] - if self.render: - self.envs[i].render() - except MujocoException as e: - return self.generate_rollouts() + obs_dict_new, _, done, info = self.venv.step(u) + o_new = obs_dict_new['observation'] + ag_new = obs_dict_new['achieved_goal'] + success = np.array([i.get('is_success', 0.0) for i in info]) + + if any(done): + # here we assume all environments are done is ~same number of steps, so we terminate rollouts whenever any of the envs returns done + # trick with using vecenvs is not to add the obs from the environments that are "done", because those are already observations + # after a reset + break + + for i, info_dict in enumerate(info): + for idx, key in enumerate(self.info_keys): + info_values[idx][t, i] = info[i][key] if np.isnan(o_new).any(): self.logger.warn('NaN caught during rollout generation. Trying again...') self.reset_all_rollouts() return self.generate_rollouts() + dones.append(done) obs.append(o.copy()) achieved_goals.append(ag.copy()) successes.append(success.copy()) @@ -129,7 +117,6 @@ class RolloutWorker: ag[...] = ag_new obs.append(o.copy()) achieved_goals.append(ag.copy()) - self.initial_o[:] = o episode = dict(o=obs, u=acts, @@ -181,8 +168,3 @@ class RolloutWorker: else: return logs - def seed(self, seed): - """Seeds each environment with a distinct seed derived from the passed in global seed. - """ - for idx, env in enumerate(self.envs): - env.seed(seed + 1000 * idx) diff --git a/baselines/run.py b/baselines/run.py index 609de6e..a493071 100644 --- a/baselines/run.py +++ b/baselines/run.py @@ -110,7 +110,8 @@ def build_env(args): config.gpu_options.allow_growth = True get_session(config=config) - env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale) + flatten_dict_observations = alg not in {'her'} + env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations) if env_type == 'mujoco': env = VecNormalize(env) @@ -206,11 +207,16 @@ def main(args): logger.log("Running trained model") env = build_env(args) obs = env.reset() - def initialize_placeholders(nlstm=128,**kwargs): - return np.zeros((args.num_env or 1, 2*nlstm)), np.zeros((1)) - state, dones = initialize_placeholders(**extra_args) + + state = model.initial_state if hasattr(model, 'initial_state') else None + dones = np.zeros((1,)) + while True: - actions, _, state, _ = model.step(obs,S=state, M=dones) + if state is not None: + actions, _, state, _ = model.step(obs,S=state, M=dones) + else: + actions, _, _, _ = model.step(obs) + obs, _, done, _ = env.step(actions) env.render() done = done.any() if isinstance(done, np.ndarray) else done diff --git a/setup.cfg b/setup.cfg index b35a7e4..20d822e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,6 +3,5 @@ select = F,E999,W291,W293 exclude = .git, __pycache__, - baselines/her, baselines/ppo1, baselines/bench,