Files
baselines/baselines/common/vec_env/shmem_vec_env.py
pzhokhov 6c44fb28fe refactor HER - phase 1 (#767)
* joshim5 changes (width and height to WarpFrame wrapper)

* match network output with action distribution via a linear layer only if necessary (#167)

* support color vs. grayscale option in WarpFrame wrapper (#166)

* support color vs. grayscale option in WarpFrame wrapper

* Support color in other wrappers

* Updated per Peters suggestions

* fixing test failures

* ppo2 with microbatches (#168)

* pass microbatch_size to the model during construction

* microbatch fixes and test (#169)

* microbatch fixes and test

* tiny cleanup

* added assertions to the test

* vpg-related fix

* Peterz joshim5 subclass ppo2 model (#170)

* microbatch fixes and test

* tiny cleanup

* added assertions to the test

* vpg-related fix

* subclassing the model to make microbatched version of model WIP

* made microbatched model a subclass of ppo2 Model

* flake8 complaint

* mpi-less ppo2 (resolving merge conflict)

* flake8 and mpi4py imports in ppo2/model.py

* more un-mpying

* merge master

* updates to the benchmark viewer code + autopep8 (#184)

* viz docs and syntactic sugar wip

* update viewer yaml to use persistent volume claims

* move plot_util to baselines.common, update links

* use 1Tb hard drive for results viewer

* small updates to benchmark vizualizer code

* autopep8

* autopep8

* any folder can be a benchmark

* massage games image a little bit

* fixed --preload option in app.py

* remove preload from run_viewer.sh

* remove pdb breakpoints

* update bench-viewer.yaml

* fixed bug (#185)

* fixed bug 

it's wrong to do the else statement, because no other nodes would start.

* changed the fix slightly

* Refactor her phase 1 (#194)

* add monitor to the rollout envs in her RUN BENCHMARKS her

* Slice -> Slide in her benchmarks RUN BENCHMARKS her

* run her benchmark for 200 epochs

* dummy commit to RUN BENCHMARKS her

* her benchmark for 500 epochs RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* disable saving of policies in her benchmark RUN BENCHMARKS her

* run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch

* run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch

* launcher refactor wip

* wip

* her works on FetchReach

* her runner refactor RUN BENCHMARKS Fetch1M

* unit test for her

* fixing warnings in mpi_average in her, skip test_fetchreach if mujoco is not present

* pickle-based serialization in her

* remove extra import from subproc_vec_env.py

* investigating differences in rollout.py

* try with old rollout code RUN BENCHMARKS her

* temporarily use DummyVecEnv in cmd_util.py RUN BENCHMARKS her

* dummy commit to RUN BENCHMARKS her

* set info_values in rollout worker in her RUN BENCHMARKS her

* bug in rollout_new.py RUN BENCHMARKS her

* fixed bug in rollout_new.py RUN BENCHMARKS her

* do not use last step because vecenv calls reset and returns obs after reset RUN BENCHMARKS her

* updated buffer sizes RUN BENCHMARKS her

* fixed loading/saving via joblib

* dust off learning from demonstrations in HER, docs, refactor

* add deprecation notice on her play and plot files

* address comments by Matthias
2018-12-19 14:44:08 -08:00

139 lines
4.9 KiB
Python

"""
An interface for asynchronous vectorized environments.
"""
from multiprocessing import Pipe, Array, Process
import numpy as np
from . import VecEnv, CloudpickleWrapper
import ctypes
from baselines import logger
from .util import dict_to_obs, obs_space_info, obs_to_dict
_NP_TO_CT = {np.float32: ctypes.c_float,
np.int32: ctypes.c_int32,
np.int8: ctypes.c_int8,
np.uint8: ctypes.c_char,
np.bool: ctypes.c_bool}
class ShmemVecEnv(VecEnv):
"""
Optimized version of SubprocVecEnv that uses shared variables to communicate observations.
"""
def __init__(self, env_fns, spaces=None):
"""
If you don't specify observation_space, we'll have to create a dummy
environment to get it.
"""
if spaces:
observation_space, action_space = spaces
else:
logger.log('Creating dummy env object to get spaces')
with logger.scoped_configure(format_strs=[]):
dummy = env_fns[0]()
observation_space, action_space = dummy.observation_space, dummy.action_space
dummy.close()
del dummy
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
self.obs_keys, self.obs_shapes, self.obs_dtypes = obs_space_info(observation_space)
self.obs_bufs = [
{k: Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys}
for _ in env_fns]
self.parent_pipes = []
self.procs = []
for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
wrapped_fn = CloudpickleWrapper(env_fn)
parent_pipe, child_pipe = Pipe()
proc = Process(target=_subproc_worker,
args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
proc.daemon = True
self.procs.append(proc)
self.parent_pipes.append(parent_pipe)
proc.start()
child_pipe.close()
self.waiting_step = False
self.specs = [f().spec for f in env_fns]
self.viewer = None
def reset(self):
if self.waiting_step:
logger.warn('Called reset() while waiting for the step to complete')
self.step_wait()
for pipe in self.parent_pipes:
pipe.send(('reset', None))
return self._decode_obses([pipe.recv() for pipe in self.parent_pipes])
def step_async(self, actions):
assert len(actions) == len(self.parent_pipes)
for pipe, act in zip(self.parent_pipes, actions):
pipe.send(('step', act))
def step_wait(self):
outs = [pipe.recv() for pipe in self.parent_pipes]
obs, rews, dones, infos = zip(*outs)
return self._decode_obses(obs), np.array(rews), np.array(dones), infos
def close_extras(self):
if self.waiting_step:
self.step_wait()
for pipe in self.parent_pipes:
pipe.send(('close', None))
for pipe in self.parent_pipes:
pipe.recv()
pipe.close()
for proc in self.procs:
proc.join()
def get_images(self, mode='human'):
for pipe in self.parent_pipes:
pipe.send(('render', None))
return [pipe.recv() for pipe in self.parent_pipes]
def _decode_obses(self, obs):
result = {}
for k in self.obs_keys:
bufs = [b[k] for b in self.obs_bufs]
o = [np.frombuffer(b.get_obj(), dtype=self.obs_dtypes[k]).reshape(self.obs_shapes[k]) for b in bufs]
result[k] = np.array(o)
return dict_to_obs(result)
def _subproc_worker(pipe, parent_pipe, env_fn_wrapper, obs_bufs, obs_shapes, obs_dtypes, keys):
"""
Control a single environment instance using IPC and
shared memory.
"""
def _write_obs(maybe_dict_obs):
flatdict = obs_to_dict(maybe_dict_obs)
for k in keys:
dst = obs_bufs[k].get_obj()
dst_np = np.frombuffer(dst, dtype=obs_dtypes[k]).reshape(obs_shapes[k]) # pylint: disable=W0212
np.copyto(dst_np, flatdict[k])
env = env_fn_wrapper.x()
parent_pipe.close()
try:
while True:
cmd, data = pipe.recv()
if cmd == 'reset':
pipe.send(_write_obs(env.reset()))
elif cmd == 'step':
obs, reward, done, info = env.step(data)
if done:
obs = env.reset()
pipe.send((_write_obs(obs), reward, done, info))
elif cmd == 'render':
pipe.send(env.render(mode='rgb_array'))
elif cmd == 'close':
pipe.send(None)
break
else:
raise RuntimeError('Got unrecognized cmd %s' % cmd)
except KeyboardInterrupt:
print('ShmemVecEnv worker: got KeyboardInterrupt')
finally:
env.close()