Compare commits

..

27 Commits

Author SHA1 Message Date
JongGyun Kim
fc0c43b199 RNN support for PPO2 (#859)
* initial implementaion of ppo2_rnn.

* set lstm memory as tf.GraphKeys.LOCAL_VARIABLES.

* replace dones with tf.placeholder_with_default.

* improves for 'play' option.

* removed unnecessary TODO .

* improve lstm code.

* move learning rate placeholer to optimizer scope.

* support the microbatched model.

* sync cnn lstm layer with originals.

* add cnn_lnlstm layer.

* fix a case when `states` is None.

* add initial_state variable to help test.

* make ppo2 rnn test available.

* rename 'obs' with 'observations'.
rename 'transition' with 'transitions'.
fix forgetting `dones` in the replay buffer.
fix a misuse of `states` and `next_states` in the replay buffer.

* make initialization once.
make `test_fixed_sequence` compatible with ppo2.

* adjust input shape.

* fix checking of a model input args in `simple_test` function.

* disable warning on purpose.

* support the play.

* improve scopes to compatible with multiple models (i.e, other tensorflow global/local variables)

* clean the scope of ppo2 policy model.

* name the memory variable of PPO RNNs more describly

* wrap the initializations in ppo2.

* remove redundant lines.

* update `REAMD.md`.

* add RNN layers.

* add the result of HalfCheeta-v2 env  experiment.

* correct a typo.

* add RNN class.

* rename `nlstm` with `num_units` in RNN builder functions.

* remove state saving.

* reuse RNNs in a2c.utils.

* revert baselines/run.py.

* replace `ppo2.step()` with original interface.

* revert `baselines/common/tests/util.py`.

* remove redundant lines.

* revert `baselines/common/test/util.py` to b875fb7.

* remove `states` variable.

* move RNN class to `baselines/ppo2/layers.py' and revert `baselines/common/models.py` to 858afa8.

* rename `model.step_as_dict` with `model.step_with_dict`.

* removed `ppo_lstm_mlp`.

* fix 02e26fd.
2019-04-26 15:17:56 -07:00
Taeyeong Jeong
5d8041d18e Fix indexing LazyFrames (#875)
Indexing LazyFrames with index i should return the single channel frame
2019-04-19 15:00:09 -07:00
Peter Zhokhov
fa37beb52e fix commit on atari bms page to point to a public commit 2019-04-06 20:03:32 -07:00
Peter Zhokhov
8a97e0df10 fix shuffling bug in ppo1 2019-04-05 15:23:46 -07:00
pzhokhov
fabbf2c611 short-circuit framestack wrapper with size 1 (#871) 2019-04-05 15:18:15 -07:00
Xingdong Zuo
5d285b318f [Update misc_util.py]: clean up unused helper functions (#751)
* Update misc_util.py

* Update misc_util.py
2019-04-05 15:16:26 -07:00
Tim Zaman
49a99c7d23 Add eps to normalization (#797) 2019-04-05 14:46:01 -07:00
Peter Zhokhov
c79b3373bf parse colon-separated env_id's 2019-04-05 14:43:09 -07:00
Sridhar Thiagarajan
6d1c6c78d3 Interface for U.make_session changed (#865) 2019-04-01 16:24:02 -07:00
JongGyun Kim
62a9c76f18 fix the definition of TfInput.make_feed_dict. (#812) 2019-04-01 15:49:25 -07:00
Hao-Chih, Lin
282c9cc91f fix small bug in plot_results() (#864)
Remove the comma behind the last input argument
2019-04-01 15:48:35 -07:00
Peter Zhokhov
096f4d9cf0 neaten up stacking logic in mujoco_dset in gail 2019-04-01 15:47:13 -07:00
Mingfei
16136ddca7 fix bugs: obs_ph normalization in adversary.py (#823)
* fix bugs: obs_ph normalization in adversary.py

* fix bug in reshape obs and acs in Mujobo_Dset
2019-04-01 15:44:31 -07:00
Darío Hereñú
b1644157d6 Fixed typo on #092 (#824) 2019-04-01 15:41:52 -07:00
Yu Feng
58541db226 MPI refer to workers as ranks, not threads. (#833) 2019-04-01 15:38:45 -07:00
zlsh80826
c02b575f01 ppo2: use time.perf_counter() instead of time.time() for time measurement (#847) 2019-04-01 15:37:32 -07:00
Pastafarianist
897fa31548 Avoid using default config while requesting available GPUs (#863) 2019-03-29 13:25:56 -07:00
Brett Daley
d51f8be8f9 Report episode rewards/length in A2C and ACKTR (#856) 2019-03-28 09:21:48 -07:00
Jacob Hilton
3f2f45acef Merge pull request #860 from openai/build-retro-env-framestack-fix
run.py framestack bug fix
2019-03-25 14:33:15 -07:00
Jacob Hilton
b64974eb90 build_env now doesn't apply frame stack to retro games twice 2019-03-24 12:27:14 -07:00
pzhokhov
1b092434fc remove f-strings for python 3.5 compatibility (#854) 2019-03-16 11:54:47 -07:00
Peter Zhokhov
1259f6ab25 check for environment being vectorized in the play logic in run.py 2019-03-11 17:44:03 -07:00
pzhokhov
74101a9f24 fix freeze of ppo2 (#849)
* fix freeze of ppo2

* unit test for freeze, updated docstring

* more docstring update

* set number of threads to 1 in the test
2019-03-11 17:28:51 -07:00
JongGyun Kim
90d66776a4 remove one of duplicated lines. (#813) 2019-03-06 15:13:01 -08:00
pzhokhov
b875fb7b5e release Internal changes (#800)
* joshim5 changes (width and height to WarpFrame wrapper)

* match network output with action distribution via a linear layer only if necessary (#167)

* support color vs. grayscale option in WarpFrame wrapper (#166)

* support color vs. grayscale option in WarpFrame wrapper

* Support color in other wrappers

* Updated per Peters suggestions

* fixing test failures

* ppo2 with microbatches (#168)

* pass microbatch_size to the model during construction

* microbatch fixes and test (#169)

* microbatch fixes and test

* tiny cleanup

* added assertions to the test

* vpg-related fix

* Peterz joshim5 subclass ppo2 model (#170)

* microbatch fixes and test

* tiny cleanup

* added assertions to the test

* vpg-related fix

* subclassing the model to make microbatched version of model WIP

* made microbatched model a subclass of ppo2 Model

* flake8 complaint

* mpi-less ppo2 (resolving merge conflict)

* flake8 and mpi4py imports in ppo2/model.py

* more un-mpying

* merge master

* updates to the benchmark viewer code + autopep8 (#184)

* viz docs and syntactic sugar wip

* update viewer yaml to use persistent volume claims

* move plot_util to baselines.common, update links

* use 1Tb hard drive for results viewer

* small updates to benchmark vizualizer code

* autopep8

* autopep8

* any folder can be a benchmark

* massage games image a little bit

* fixed --preload option in app.py

* remove preload from run_viewer.sh

* remove pdb breakpoints

* update bench-viewer.yaml

* fixed bug (#185)

* fixed bug 

it's wrong to do the else statement, because no other nodes would start.

* changed the fix slightly

* Refactor her phase 1 (#194)

* add monitor to the rollout envs in her RUN BENCHMARKS her

* Slice -> Slide in her benchmarks RUN BENCHMARKS her

* run her benchmark for 200 epochs

* dummy commit to RUN BENCHMARKS her

* her benchmark for 500 epochs RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* add num_timesteps to her benchmark to be compatible with viewer RUN BENCHMARKS her

* disable saving of policies in her benchmark RUN BENCHMARKS her

* run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch

* run fetch benchmarks with ppo2 and ddpg RUN BENCHMARKS Fetch

* launcher refactor wip

* wip

* her works on FetchReach

* her runner refactor RUN BENCHMARKS Fetch1M

* unit test for her

* fixing warnings in mpi_average in her, skip test_fetchreach if mujoco is not present

* pickle-based serialization in her

* remove extra import from subproc_vec_env.py

* investigating differences in rollout.py

* try with old rollout code RUN BENCHMARKS her

* temporarily use DummyVecEnv in cmd_util.py RUN BENCHMARKS her

* dummy commit to RUN BENCHMARKS her

* set info_values in rollout worker in her RUN BENCHMARKS her

* bug in rollout_new.py RUN BENCHMARKS her

* fixed bug in rollout_new.py RUN BENCHMARKS her

* do not use last step because vecenv calls reset and returns obs after reset RUN BENCHMARKS her

* updated buffer sizes RUN BENCHMARKS her

* fixed loading/saving via joblib

* dust off learning from demonstrations in HER, docs, refactor

* add deprecation notice on her play and plot files

* address comments by Matthias

* 1.5 months of codegen changes (#196)

* play with resnet

* feed_dict version

* coinrun prob and more stats

* fixes to get_choices_specs & hp search

* minor prob fixes

* minor fixes

* minor

* alternative version of rl_algo stuff

* pylint fixes

* fix bugs, move node_filters to soup

* changed how get_algo works

* change how get_algo works, probably broke all tests

* continue previous refactor

* get eval_agent running again

* fixing tests

* fix tests

* fix more tests

* clean up cma stuff

* fix experiment

* minor changes to eval_agent to make ppo_metal use gpu

* make dict space work

* modify mac makefile to use conda

* recurrent layers

* play with bn and resnets

* minor hp changes

* minor

* got rid of use_fb argument and jtft (joint-train-fine-tune) functionality
built test phase directly into AlgoProb

* make new rl algos generateable

* pylint; start fixing tests

* fixing tests

* more test fixes

* pylint

* fix search

* work on search

* hack around infinite loop caused by scan

* algo search fixes

* misc changes for search expt

* enable annealing, overriding options of Op

* pylint fixes

* identity op

* achieve use_last_output through masking so it automatically works in other distributions

* fix tests

* minor

* discrete

* use_last_output to be just a preference, not a hard constraint

* pred delay, pruning

* require nontrivial inputs

* aliases for get_sm

* add probname to probs

* fixes

* small fixes

* fix tests

* fix tests

* fix tests

* minor

* test scripts

* dualgru network improvements

* minor

* work on mysterious bugs

* rcall gpu-usage command for kube

* use cache dir that’s not in code folder, so that it doesn’t get removed by rcall code rsync

* add power mode to gpu usage

* make sure train/test actually different

* remove VR for now

* minor fixes

* simplify soln_db

* minor

* big refactor of mpi eda

* improve mpieda for multitask

* - get rid of timelimit hack
- add __del__ to cleanup SubprocVecEnv

* get multitask working better

* fixes

* working on atari, various

* annotate ops with whether they’re parametrized

* minor

* gym version

* rand atari prob

* minor

* SolnDb bugfix and name change

* pyspy script

* switch conv layers

* fix roboschool/bullet3

* nenvs assertion

* fix rand atari

* get rid of blanket exception catching
fix soln_db bug

* fix rand_atari

* dynamic routing as cmdline arg

* slight modifications to test_mpi_map and pyspy-all

* max_tries argument for run_until_successs

* dedup option in train_mle

* simplify soln_db

* increase atari horizon for 1 experiment

* start implementing reward increment

* ent multiplier

* create cc dsl
other misc fixes

* cc ops

* q_func -> qs in rl_algos_cc.py

* fix PredictDistr

* rl_ops_cc fixes, MakeAction op

* augment algo agent to support cc stuff

* work on ddpg experiments

* fix blocking
temporarily change logger

* allow layer scaling

* pylint fixes

* spawn_method

* isolate ddpg hacks

* improve pruning

* use spawn for subproc

* remove use of python -c in rcall

* fix pylint warning

* fix static

* maybe fix local backend

* switch to DummyVecEnv

* making some fixes via pylint

* pylint fixes

* fixing tests

* fix tests

* fix tests

* write scaffolding for SSL in Codegen

* logger fix

* fix error

* add EMA op to sl_ops

* save many changes

* save

* add upsampler

* add sl ops, enhance state machine

* get ssl search working — some gross hacking

* fix session/graph issue

* fix importing

* work on mle

* - scale embeddings in gru model
- better exception handling in sl_prob
- use emas for test/val
- use non-contrib batch_norm layer

* improve logging

* option to average before dumping in logger

* default arguments, etc

* new ddpg and identity test

* concat fix

* minor

* move realistic ssl stuff to third-party (underscore to dash)

* fixes

* remove realistic_ssl_evaluation

* pylint fixes

* use gym master

* try again

* pass around args without gin

* fix tests

* separate line to install gym

* rename failing tests that should be ignored

* add data aug

* ssl improvements

* use fixed time limit

* try to fix baselines tests

* add score_floor, max_walltime, fiddle with lr decay

* realistic_ssl

* autopep8

* various ssl
- enable blocking grad for simplification
- kl
- multiple final prediction

* fix pruning

* misc ssl stuff

* bring back linear schedule, don’t use allgather for collecting stats
(i’ve been getting nondeterministic errors from the old code)

* save/load weights in SSL, big stepsize

* cleanup SslProb

* fix

* get rid of kl coef

* fix simplification, lower lr

* search over hps

* minor fixes

* minor

* static analysis

* move files and rename things for improved consistency.
still broken, and just saving before making nontrivial changes

* various

* make tests pass

* move coinrun_train to codegen since it depends on codegen

* fixes

* pylint fixes

* improve tests
fix some things

* improve tests

* lint

* fix up db_info.py, tests

* mostly restore master version of envs directory, except for makefile changes

* fix tests

* improve printing

* minor fixes

* fix fixmes

* pruning test

* fixes

* lint

* write new test that makes tf graphs of random algos; fix some bugs it caught

* add —delete flag to rcall upload-code command

* lint

* get cifar10 lazily for testing purposes

* disable codegen ci tests for now

* clean up rl_ops

* rename spec classes

* td3 with identity test

* identity tests without gin files

* remove gin.configurable from AlgoAgent

* comments about reduction in rl_ops_cc

* address @pzhokhov comments

* fix tests

* more linting

* better tests

* clean up filtering a bit

* fix concat

* delayed logger configuration (#208)

* delayed logger configuration

* fix typo

* setters and getters for Logger.DEFAULT as well

* do away with fancy property stuff - unable to get it to work with class level methods

* grammar and spaces

* spaces

* use get_current function instead of reading Logger.CURRENT

* autopep8

* disable mpi in subprocesses (#213)

* lazy_mpi load

* cleanups

* more lazy mpi

* don't pretend that class is a module, just use it as a class

* mass-replace mpi4py imports

* flake8

* fix previous lazy_mpi imports

* silly recursion

* try os.environ hack

* better prefix test, work with mpich

* restored MPI imports

* removed commented import in test_with_mpi

* restored codegen from master

* remove lazy mpi

* restored changes from rl-algs

* remove extra files

* address Chris' comments

* use spawn for shmem vec env as well (#2) (#219)

* lazy_mpi load

* cleanups

* more lazy mpi

* don't pretend that class is a module, just use it as a class

* mass-replace mpi4py imports

* flake8

* fix previous lazy_mpi imports

* silly recursion

* try os.environ hack

* better prefix test, work with mpich

* restored MPI imports

* removed commented import in test_with_mpi

* restored codegen from master

* remove lazy mpi

* restored changes from rl-algs

* remove extra files

* port mpi fix to shmem vec env

* increase the mpi test default timeout

* change humanoid hyperparameters, get rid of clip_Frac annealing, as it's apparently dangerous

* remove clip_frac schedule from ppo2

* more timesteps in humanoid run

* whitespace + RUN BENCHMARKS

* baselines: export vecenvs from folder (#221)

* baselines: export vecenvs from folder

* put missing function back in

* add missing imports

* more imports

* longer mpi timeout?

* make default logger configuration the same as call to logger.configure() (#222)

* Vecenv refactor (#223)

* update karl util

* restore pvi flag

* change rcall auto cpu behavior, move gin.configurable, add os.makedirs

* vecenv refactor

* aux buf index fix

* add num aux obs

* reset level with enter

* restore high difficulty flag

* bugfix

* restore train_coinrun.py

* tweaks

* renaming

* renaming

* better arguments handling

* more options

* options cleanup

* game data refactor

* more options

* args for train_procgen

* add close handler to interactive base class

* use debug build if debug=True, fix range on aux_obs

* add ProcGenEnv to __init__.py, add missing imports to procgen.py

* export RemoveDictWrapper and build, update train_procgen.py, move assets download into env creation and replace init_assets_and_build with just build

* fix formatting issues

* only call global init once

* fix path in setup.py

* revert part of makefile

* ignore IDE files and folders

* vec remove dict

* export VecRemoveDictObs

* remove RemoveDictWrapper

* remove IDE files

* move shared .h and .cpp files to common folder, update build to use those, dedupe env.cpp

* fix missing header

* try unified build function

* remove old scripts dir

* add comment on build

* upload libenv with render fixes

* tell qthreads to die when we unload the library

* pyglet.app.run is garbage

* static fixes

* whoops

* actually vsync is on

* cleanup

* cleanup

* extern C for libenv interface

* parse util rcall arg

* high difficulty fix

* game type enums

* ProcGenEnv subclasses

* game type cleanup

* unrecognized key

* unrecognized game type

* parse util reorg

* args management

* typo fix

* GinParser

* arg tweaks

* tweak

* restore start_level/num_levels setting

* fix create_procgen_env interface

* build fix

* procgen args in init signature

* fix

* build fix

* fix logger usage in ppo_metal/run_retro

* removed unnecessary OrderedDict requirement in subproc_vec_env

* flake8 fix

* allow for non-mpi tests

* mpi test fixes

* flake8; removed special logic for discrete spaces in dummy_vec_env

* remove forked argument in front of tests - does not play nicely with subprocvecenv in spawned processes; analog of forked in ddpg/test_smoke

* Everyrl initial commit & a few minor baselines changes (#226)

* everyrl initial commit

* add keep_buf argument to VecMonitor

* logger changes: set_comm and fix to mpi_mean functionality

* if filename not provided, don't create ResultsWriter

* change variable syncing function to simplify its usage. now you should initialize from all mpi processes

* everyrl coinrun changes

* tf_distr changes, bugfix

* get_one

* bring back get_next to temporarily restore code

* lint fixes

* fix test

* rename profile function

* rename gaussian

* fix coinrun training script

* change random seeding to work with new gym version (#231)

* change random seeding to work with new gym version

* move seeding to seed() method

* fix mnistenv

* actually try some of the tests before pushing

* more deterministic fixed seq

* misc changes to vecenvs and run.py for benchmarks (#236)

* misc changes to vecenvs and run.py for benchmarks

* dont seed global gen

* update more references to assert_venvs_equal

* Rl19 (#232)

* everyrl initial commit

* add keep_buf argument to VecMonitor

* logger changes: set_comm and fix to mpi_mean functionality

* if filename not provided, don't create ResultsWriter

* change variable syncing function to simplify its usage. now you should initialize from all mpi processes

* everyrl coinrun changes

* tf_distr changes, bugfix

* get_one

* bring back get_next to temporarily restore code

* lint fixes

* fix test

* rename profile function

* rename gaussian

* fix coinrun training script

* rl19

* remove everyrl dir which appeared in the merge for some reason

* readme

* fiddle with ddpg

* make ddpg work

* steps_total argument

* gpu count

* clean up hyperparams and shape math

* logging + saving

* configuration stuff

* fixes, smoke tests

* fix stats

* make load_results return dicts -- easier to create the same kind of objects with some other mechanism for passing to downstream functions

* benchmarks

* fix tests

* add dqn to tests, fix it

* minor

* turned annotated transformer (pytorch) into a script

* more refactoring

* jax stuff

* cluster

* minor

* copy & paste alec code

* sign error

* add huber, rename some parameters, snapshotting off by default

* remove jax stuff

* minor

* move maze env

* minor

* remove trailing spaces

* remove trailing space

* lint

* fix test breakage due to gym update

* rename function

* move maze back to codegen

* get recurrent ppo working

* enable both lstm and gru

* script to print table of benchmark results

* various

* fix dqn

* add fixup initializer, remove lastrew

* organize logging stats

* fix silly bug

* refactor models

* fix mpi usage

* check sync

* minor

* change vf coef, hps

* clean up slicing in ppo

* minor fixes

* caching transformer

* docstrings

* xf fixes

* get rid of 'B' and 'BT' arguments

* minor

* transformer example

* remove output_kind from base class until we have a better idea how to use it

* add comments, revert maze stuff

* flake8

* codegen lint

* fix codegen tests

* responded to peter's comments

* lint fixes

* minor changes to baselines (#243)

* minor changes to baselines

* fix spaces reference

* remove flake8 disable comments and fix import

* okay maybe don't add spec to vec_env

* Merge branch 'master' of github.com:openai/games

 the commit.

* flake8 complaints in baselines/her
2019-02-27 15:35:31 -08:00
Peter Zhokhov
675b100190 raised the tolerance on the test_microbatches test 2019-02-27 14:22:24 -08:00
Peter Zhokhov
adc4388f6b fixes to catch changes in gym 2019-02-27 12:49:40 -08:00
62 changed files with 1376 additions and 790 deletions

View File

@@ -11,4 +11,4 @@ install:
script:
- flake8 . --show-source --statistics
- docker run baselines-test pytest -v --forked .
- docker run baselines-test pytest -v .

View File

@@ -89,7 +89,7 @@ python -m baselines.run --alg=ppo2 --env=Humanoid-v2 --network=mlp --num_timeste
will set entropy coefficient to 0.1, and construct fully connected network with 3 layers with 32 hidden units in each, and create a separate network for value function estimation (so that its parameters are not shared with the policy network, but the structure is the same)
See docstrings in [common/models.py](baselines/common/models.py) for description of network parameters for each type of model, and
docstring for [baselines/ppo2/ppo2.py/learn()](baselines/ppo2/ppo2.py#L152) for the description of the ppo2 hyperparamters.
docstring for [baselines/ppo2/ppo2.py/learn()](baselines/ppo2/ppo2.py#L152) for the description of the ppo2 hyperparameters.
### Example 2. DQN on Atari
DQN with Atari is at this point a classics of benchmarks. To run the baselines implementation of DQN on Atari Pong:

View File

@@ -11,6 +11,8 @@ from baselines.common.policies import build_policy
from baselines.a2c.utils import Scheduler, find_trainable_variables
from baselines.a2c.runner import Runner
from baselines.ppo2.ppo2 import safemean
from collections import deque
from tensorflow import losses
@@ -195,6 +197,7 @@ def learn(
# Instantiate the runner object
runner = Runner(env, model, nsteps=nsteps, gamma=gamma)
epinfobuf = deque(maxlen=100)
# Calculate the batch_size
nbatch = nenvs*nsteps
@@ -204,7 +207,8 @@ def learn(
for update in range(1, total_timesteps//nbatch+1):
# Get mini batch of experiences
obs, states, rewards, masks, actions, values = runner.run()
obs, states, rewards, masks, actions, values, epinfos = runner.run()
epinfobuf.extend(epinfos)
policy_loss, value_loss, policy_entropy = model.train(obs, states, rewards, masks, actions, values)
nseconds = time.time()-tstart
@@ -221,6 +225,8 @@ def learn(
logger.record_tabular("policy_entropy", float(policy_entropy))
logger.record_tabular("value_loss", float(value_loss))
logger.record_tabular("explained_variance", float(ev))
logger.record_tabular("eprewmean", safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.record_tabular("eplenmean", safemean([epinfo['l'] for epinfo in epinfobuf]))
logger.dump_tabular()
return model

View File

@@ -22,6 +22,7 @@ class Runner(AbstractEnvRunner):
# We initialize the lists that will contain the mb of experiences
mb_obs, mb_rewards, mb_actions, mb_values, mb_dones = [],[],[],[],[]
mb_states = self.states
epinfos = []
for n in range(self.nsteps):
# Given observations, take action and value (V(s))
# We already have self.obs because Runner superclass run self.obs[:] = env.reset() on init
@@ -34,7 +35,10 @@ class Runner(AbstractEnvRunner):
mb_dones.append(self.dones)
# Take actions in env and look the results
obs, rewards, dones, _ = self.env.step(actions)
obs, rewards, dones, infos = self.env.step(actions)
for info in infos:
maybeepinfo = info.get('episode')
if maybeepinfo: epinfos.append(maybeepinfo)
self.states = states
self.dones = dones
self.obs = obs
@@ -69,4 +73,4 @@ class Runner(AbstractEnvRunner):
mb_rewards = mb_rewards.flatten()
mb_values = mb_values.flatten()
mb_masks = mb_masks.flatten()
return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values
return mb_obs, mb_states, mb_rewards, mb_masks, mb_actions, mb_values, epinfos

View File

@@ -11,6 +11,8 @@ from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.a2c.runner import Runner
from baselines.a2c.utils import Scheduler, find_trainable_variables
from baselines.acktr import kfac
from baselines.ppo2.ppo2 import safemean
from collections import deque
class Model(object):
@@ -118,6 +120,7 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
model.load(load_path)
runner = Runner(env, model, nsteps=nsteps, gamma=gamma)
epinfobuf = deque(maxlen=100)
nbatch = nenvs*nsteps
tstart = time.time()
coord = tf.train.Coordinator()
@@ -127,7 +130,8 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
enqueue_threads = []
for update in range(1, total_timesteps//nbatch+1):
obs, states, rewards, masks, actions, values = runner.run()
obs, states, rewards, masks, actions, values, epinfos = runner.run()
epinfobuf.extend(epinfos)
policy_loss, value_loss, policy_entropy = model.train(obs, states, rewards, masks, actions, values)
model.old_obs = obs
nseconds = time.time()-tstart
@@ -141,6 +145,8 @@ def learn(network, env, seed, total_timesteps=int(40e6), gamma=0.99, log_interva
logger.record_tabular("policy_loss", float(policy_loss))
logger.record_tabular("value_loss", float(value_loss))
logger.record_tabular("explained_variance", float(ev))
logger.record_tabular("eprewmean", safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.record_tabular("eplenmean", safemean([epinfo['l'] for epinfo in epinfobuf]))
logger.dump_tabular()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir():

View File

@@ -20,7 +20,7 @@ def register_benchmark(benchmark):
if 'tasks' in benchmark:
for t in benchmark['tasks']:
if 'desc' not in t:
t['desc'] = remove_version_re.sub('', t['env_id'])
t['desc'] = remove_version_re.sub('', t.get('env_id', t.get('id')))
_BENCHMARKS.append(benchmark)

View File

@@ -16,11 +16,13 @@ class Monitor(Wrapper):
def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
Wrapper.__init__(self, env=env)
self.tstart = time.time()
self.results_writer = ResultsWriter(
filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
if filename:
self.results_writer = ResultsWriter(filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
else:
self.results_writer = None
self.reset_keywords = reset_keywords
self.info_keywords = info_keywords
self.allow_early_resets = allow_early_resets
@@ -68,8 +70,9 @@ class Monitor(Wrapper):
self.episode_lengths.append(eplen)
self.episode_times.append(time.time() - self.tstart)
epinfo.update(self.current_reset_info)
self.results_writer.write_row(epinfo)
if self.results_writer:
self.results_writer.write_row(epinfo)
assert isinstance(info, dict)
if isinstance(info, dict):
info['episode'] = epinfo
@@ -96,24 +99,21 @@ class LoadMonitorResultsError(Exception):
class ResultsWriter(object):
def __init__(self, filename=None, header='', extra_keys=()):
def __init__(self, filename, header='', extra_keys=()):
self.extra_keys = extra_keys
if filename is None:
self.f = None
self.logger = None
else:
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()
assert filename is not None
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()
def write_row(self, epinfo):
if self.logger:
@@ -121,7 +121,6 @@ class ResultsWriter(object):
self.f.flush()
def get_monitor_files(dir):
return glob(osp.join(dir, "*" + Monitor.EXT))

View File

@@ -6,6 +6,8 @@ import gym
from gym import spaces
import cv2
cv2.ocl.setUseOpenCL(False)
from .wrappers import TimeLimit
class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30):
@@ -219,16 +221,15 @@ class LazyFrames(object):
return len(self._force())
def __getitem__(self, i):
return self._force()[i]
return self._force()[..., i]
def make_atari(env_id, timelimit=True):
# XXX(john): remove timelimit argument after gym is upgraded to allow double wrapping
def make_atari(env_id, max_episode_steps=None):
env = gym.make(env_id)
if not timelimit:
env = env.env
assert 'NoFrameskip' in env.spec.id
env = NoopResetEnv(env, noop_max=30)
env = MaxAndSkipEnv(env, skip=4)
if max_episode_steps is not None:
env = TimeLimit(env, max_episode_steps=max_episode_steps)
return env
def wrap_deepmind(env, episode_life=True, clip_rewards=True, frame_stack=False, scale=False):

View File

@@ -23,10 +23,7 @@ def make_vec_env(env_id, env_type, num_env, seed,
start_index=0,
reward_scale=1.0,
flatten_dict_observations=True,
gamestate=None,
initializer=None,
env_kwargs=None,
force_dummy=False):
gamestate=None):
"""
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
"""
@@ -34,7 +31,7 @@ def make_vec_env(env_id, env_type, num_env, seed,
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir()
def make_thunk(rank, initializer=None):
def make_thunk(rank):
return lambda: make_env(
env_id=env_id,
env_type=env_type,
@@ -45,22 +42,17 @@ def make_vec_env(env_id, env_type, num_env, seed,
gamestate=gamestate,
flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs,
logger_dir=logger_dir,
initializer=initializer,
env_kwargs=env_kwargs,
logger_dir=logger_dir
)
set_global_seeds(seed)
if not force_dummy and num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)])
if num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index) for i in range(num_env)])
else:
return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])
return DummyVecEnv([make_thunk(start_index)])
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, logger_dir=None, initializer=None, env_kwargs=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, logger_dir=None):
wrapper_kwargs = wrapper_kwargs or {}
if env_type == 'atari':
env = make_atari(env_id)
@@ -69,7 +61,7 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **(env_kwargs or {}))
env = gym.make(env_id)
if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
keys = env.observation_space.spaces.keys()
@@ -83,6 +75,8 @@ def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.
if env_type == 'atari':
env = wrap_deepmind(env, **wrapper_kwargs)
elif env_type == 'retro':
if 'frame_stack' not in wrapper_kwargs:
wrapper_kwargs['frame_stack'] = 1
env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs)
if reward_scale != 1:
@@ -156,7 +150,6 @@ def common_arg_parser():
parser.add_argument('--save_video_interval', help='Save video every x steps (0 = disabled)', default=0, type=int)
parser.add_argument('--save_video_length', help='Length of recorded video. Default: 200', default=200, type=int)
parser.add_argument('--play', default=False, action='store_true')
parser.add_argument('--extra_import', help='Extra module to import to access external environments', type=str, default=None)
return parser
def robotics_arg_parser():

View File

@@ -206,7 +206,8 @@ class CategoricalPd(Pd):
class MultiCategoricalPd(Pd):
def __init__(self, nvec, flat):
self.flat = flat
self.categoricals = list(map(CategoricalPd, tf.split(flat, nvec, axis=-1)))
self.categoricals = list(map(CategoricalPd,
tf.split(flat, np.array(nvec, dtype=np.int32), axis=-1)))
def flatparam(self):
return self.flat
def mode(self):

View File

@@ -13,27 +13,6 @@ def zipsame(*seqs):
return zip(*seqs)
def unpack(seq, sizes):
"""
Unpack 'seq' into a sequence of lists, with lengths specified by 'sizes'.
None = just one bare element, not a list
Example:
unpack([1,2,3,4,5,6], [3,None,2]) -> ([1,2,3], 4, [5,6])
"""
seq = list(seq)
it = iter(seq)
assert sum(1 if s is None else s for s in sizes) == len(seq), "Trying to unpack %s into %s" % (seq, sizes)
for size in sizes:
if size is None:
yield it.__next__()
else:
li = []
for _ in range(size):
li.append(it.__next__())
yield li
class EzPickle(object):
"""Objects that are pickled and unpickled via their constructor
arguments.

View File

@@ -1,6 +1,11 @@
import numpy as np
import tensorflow as tf
from mpi4py import MPI
from baselines.common import tf_util as U
from baselines.common.tests.test_with_mpi import with_mpi
try:
from mpi4py import MPI
except ImportError:
MPI = None
class MpiAdamOptimizer(tf.train.AdamOptimizer):
"""Adam optimizer that averages gradients across mpi processes."""
@@ -13,19 +18,61 @@ class MpiAdamOptimizer(tf.train.AdamOptimizer):
flat_grad = tf.concat([tf.reshape(g, (-1,)) for g, v in grads_and_vars], axis=0)
shapes = [v.shape.as_list() for g, v in grads_and_vars]
sizes = [int(np.prod(s)) for s in shapes]
num_tasks = self.comm.Get_size()
buf = np.zeros(sum(sizes), np.float32)
def _collect_grads(flat_grad):
countholder = [0] # Counts how many times _collect_grads has been called
stat = tf.reduce_sum(grads_and_vars[0][1]) # sum of first variable
def _collect_grads(flat_grad, np_stat):
self.comm.Allreduce(flat_grad, buf, op=MPI.SUM)
np.divide(buf, float(num_tasks), out=buf)
if countholder[0] % 100 == 0:
check_synced(np_stat, self.comm)
countholder[0] += 1
return buf
avg_flat_grad = tf.py_func(_collect_grads, [flat_grad], tf.float32)
avg_flat_grad = tf.py_func(_collect_grads, [flat_grad, stat], tf.float32)
avg_flat_grad.set_shape(flat_grad.shape)
avg_grads = tf.split(avg_flat_grad, sizes, axis=0)
avg_grads_and_vars = [(tf.reshape(g, v.shape), v)
for g, (_, v) in zip(avg_grads, grads_and_vars)]
return avg_grads_and_vars
def check_synced(localval, comm=None):
"""
It's common to forget to initialize your variables to the same values, or
(less commonly) if you update them in some other way than adam, to get them out of sync.
This function checks that variables on all MPI workers are the same, and raises
an AssertionError otherwise
Arguments:
comm: MPI communicator
localval: list of local variables (list of variables on current worker to be compared with the other workers)
"""
comm = comm or MPI.COMM_WORLD
vals = comm.gather(localval)
if comm.rank == 0:
assert all(val==vals[0] for val in vals[1:])
@with_mpi(timeout=5)
def test_nonfreeze():
np.random.seed(0)
tf.set_random_seed(0)
a = tf.Variable(np.random.randn(3).astype('float32'))
b = tf.Variable(np.random.randn(2,5).astype('float32'))
loss = tf.reduce_sum(tf.square(a)) + tf.reduce_sum(tf.sin(b))
stepsize = 1e-2
# for some reason the session config with inter_op_parallelism_threads was causing
# nested sess.run calls to freeze
config = tf.ConfigProto(inter_op_parallelism_threads=1)
sess = U.get_session(config=config)
update_op = MpiAdamOptimizer(comm=MPI.COMM_WORLD, learning_rate=stepsize).minimize(loss)
sess.run(tf.global_variables_initializer())
losslist_ref = []
for i in range(100):
l,_ = sess.run([loss, update_op])
print(i, l)
losslist_ref.append(l)

View File

@@ -1,9 +1,16 @@
from collections import defaultdict
from mpi4py import MPI
import os, numpy as np
import platform
import shutil
import subprocess
import warnings
import sys
try:
from mpi4py import MPI
except ImportError:
MPI = None
def sync_from_root(sess, variables, comm=None):
"""
@@ -13,15 +20,10 @@ def sync_from_root(sess, variables, comm=None):
variables: all parameter variables including optimizer's
"""
if comm is None: comm = MPI.COMM_WORLD
rank = comm.Get_rank()
for var in variables:
if rank == 0:
comm.Bcast(sess.run(var))
else:
import tensorflow as tf
returned_var = np.empty(var.shape, dtype='float32')
comm.Bcast(returned_var)
sess.run(tf.assign(var, returned_var))
import tensorflow as tf
values = comm.bcast(sess.run(variables))
sess.run([tf.assign(var, val)
for (var, val) in zip(variables, values)])
def gpu_count():
"""
@@ -34,13 +36,15 @@ def gpu_count():
def setup_mpi_gpus():
"""
Set CUDA_VISIBLE_DEVICES using MPI.
Set CUDA_VISIBLE_DEVICES to MPI rank if not already set
"""
num_gpus = gpu_count()
if num_gpus == 0:
return
local_rank, _ = get_local_rank_size(MPI.COMM_WORLD)
os.environ['CUDA_VISIBLE_DEVICES'] = str(local_rank % num_gpus)
if 'CUDA_VISIBLE_DEVICES' not in os.environ:
if sys.platform == 'darwin': # This Assumes if you're on OSX you're just
ids = [] # doing a smoke test and don't want GPUs
else:
lrank, _lsize = get_local_rank_size(MPI.COMM_WORLD)
ids = [lrank]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, ids))
def get_local_rank_size(comm):
"""
@@ -81,6 +85,9 @@ def share_file(comm, path):
comm.Barrier()
def dict_gather(comm, d, op='mean', assert_all_have_data=True):
"""
Perform a reduction operation over dicts
"""
if comm is None: return d
alldicts = comm.allgather(d)
size = comm.size
@@ -99,3 +106,28 @@ def dict_gather(comm, d, op='mean', assert_all_have_data=True):
else:
assert 0, op
return result
def mpi_weighted_mean(comm, local_name2valcount):
"""
Perform a weighted average over dicts that are each on a different node
Input: local_name2valcount: dict mapping key -> (value, count)
Returns: key -> mean
"""
all_name2valcount = comm.gather(local_name2valcount)
if comm.rank == 0:
name2sum = defaultdict(float)
name2count = defaultdict(float)
for n2vc in all_name2valcount:
for (name, (val, count)) in n2vc.items():
try:
val = float(val)
except ValueError:
if comm.rank == 0:
warnings.warn('WARNING: tried to compute mean on non-float {}={}'.format(name, val))
else:
name2sum[name] += val * count
name2count[name] += count
return {name : name2sum[name] / name2count[name] for name in name2sum}
else:
return {}

View File

@@ -248,7 +248,7 @@ def plot_results(
figsize=None,
legend_outside=False,
resample=0,
smooth_step=1.0,
smooth_step=1.0
):
'''
Plot multiple Results objects

View File

@@ -1,25 +1,11 @@
# flake8: noqa F403, F405
from .atari_wrappers import *
from collections import deque
import cv2
cv2.ocl.setUseOpenCL(False)
from .atari_wrappers import WarpFrame, ClipRewardEnv, FrameStack, ScaledFloatFrame
from .wrappers import TimeLimit
import numpy as np
import gym
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
def step(self, ac):
observation, reward, done, info = self.env.step(ac)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
done = True
info['TimeLimit.truncated'] = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
class StochasticFrameSkip(gym.Wrapper):
def __init__(self, env, n, stickprob):
@@ -99,7 +85,7 @@ class Downsample(gym.ObservationWrapper):
gym.ObservationWrapper.__init__(self, env)
(oldh, oldw, oldc) = env.observation_space.shape
newshape = (oldh//ratio, oldw//ratio, oldc)
self.observation_space = spaces.Box(low=0, high=255,
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=newshape, dtype=np.uint8)
def observation(self, frame):
@@ -116,7 +102,7 @@ class Rgb2gray(gym.ObservationWrapper):
"""
gym.ObservationWrapper.__init__(self, env)
(oldh, oldw, _oldc) = env.observation_space.shape
self.observation_space = spaces.Box(low=0, high=255,
self.observation_space = gym.spaces.Box(low=0, high=255,
shape=(oldh, oldw, 1), dtype=np.uint8)
def observation(self, frame):
@@ -213,8 +199,10 @@ class StartDoingRandomActionsWrapper(gym.Wrapper):
self.some_random_steps()
return self.last_obs, rew, done, info
def make_retro(*, game, state, max_episode_steps, **kwargs):
def make_retro(*, game, state=None, max_episode_steps=4500, **kwargs):
import retro
if state is None:
state = retro.State.DEFAULT
env = retro.make(game, state, **kwargs)
env = StochasticFrameSkip(env, n=4, stickprob=0.25)
if max_episode_steps is not None:
@@ -227,7 +215,8 @@ def wrap_deepmind_retro(env, scale=True, frame_stack=4):
"""
env = WarpFrame(env)
env = ClipRewardEnv(env)
env = FrameStack(env, frame_stack)
if frame_stack > 1:
env = FrameStack(env, frame_stack)
if scale:
env = ScaledFloatFrame(env)
return env

View File

@@ -177,7 +177,7 @@ def profile_tf_runningmeanstd():
outfile = '/tmp/timeline.json'
with open(outfile, 'wt') as f:
f.write(chrome_trace)
print(f'Successfully saved profile to {outfile}. Exiting.')
print('Successfully saved profile to {}. Exiting.'.format(outfile))
exit(0)
'''

View File

@@ -0,0 +1,27 @@
from baselines import logger
from baselines.common.tests.test_with_mpi import with_mpi
from baselines.common import mpi_util
@with_mpi()
def test_mpi_weighted_mean():
from mpi4py import MPI
comm = MPI.COMM_WORLD
with logger.scoped_configure(comm=comm):
if comm.rank == 0:
name2valcount = {'a' : (10, 2), 'b' : (20,3)}
elif comm.rank == 1:
name2valcount = {'a' : (19, 1), 'c' : (42,3)}
else:
raise NotImplementedError
d = mpi_util.mpi_weighted_mean(comm, name2valcount)
correctval = {'a' : (10 * 2 + 19) / 3.0, 'b' : 20, 'c' : 42}
if comm.rank == 0:
assert d == correctval, '{} != {}'.format(d, correctval)
for name, (val, count) in name2valcount.items():
for _ in range(count):
logger.logkv_mean(name, val)
d2 = logger.dumpkvs()
if comm.rank == 0:
assert d2 == correctval

View File

@@ -7,21 +7,20 @@ class FixedSequenceEnv(Env):
def __init__(
self,
n_actions=10,
seed=0,
episode_len=100
):
self.np_random = np.random.RandomState()
self.np_random.seed(seed)
self.sequence = [self.np_random.randint(0, n_actions-1) for _ in range(episode_len)]
self.sequence = None
self.action_space = Discrete(n_actions)
self.observation_space = Discrete(1)
self.episode_len = episode_len
self.time = 0
self.reset()
def reset(self):
if self.sequence is None:
self.sequence = [self.np_random.randint(0, self.action_space.n-1) for _ in range(self.episode_len)]
self.time = 0
return 0
@@ -35,6 +34,9 @@ class FixedSequenceEnv(Env):
return 0, rew, done, {}
def seed(self, seed=None):
self.np_random.seed(seed)
def _choose_next_state(self):
self.time += 1

View File

@@ -10,6 +10,7 @@ class IdentityEnv(Env):
episode_len=None
):
self.observation_space = self.action_space
self.episode_len = episode_len
self.time = 0
self.reset()
@@ -17,7 +18,6 @@ class IdentityEnv(Env):
def reset(self):
self._choose_next_state()
self.time = 0
self.observation_space = self.action_space
return self.state
@@ -26,11 +26,13 @@ class IdentityEnv(Env):
self._choose_next_state()
done = False
if self.episode_len and self.time >= self.episode_len:
rew = 0
done = True
return self.state, rew, done, {}
def seed(self, seed=None):
self.action_space.seed(seed)
def _choose_next_state(self):
self.state = self.action_space.sample()
self.time += 1
@@ -74,7 +76,7 @@ class BoxIdentityEnv(IdentityEnv):
episode_len=None,
):
self.action_space = Box(low=-1.0, high=1.0, shape=shape)
self.action_space = Box(low=-1.0, high=1.0, shape=shape, dtype=np.float32)
super().__init__(episode_len=episode_len)
def _get_reward(self, actions):

View File

@@ -9,7 +9,6 @@ from gym.spaces import Discrete, Box
class MnistEnv(Env):
def __init__(
self,
seed=0,
episode_len=None,
no_images=None
):
@@ -23,7 +22,6 @@ class MnistEnv(Env):
self.mnist = input_data.read_data_sets(mnist_path)
self.np_random = np.random.RandomState()
self.np_random.seed(seed)
self.observation_space = Box(low=0.0, high=1.0, shape=(28,28,1))
self.action_space = Discrete(10)
@@ -50,6 +48,9 @@ class MnistEnv(Env):
return self.state[0], rew, done, {}
def seed(self, seed=None):
self.np_random.seed(seed)
def train_mode(self):
self.dataset = self.mnist.train

View File

@@ -17,10 +17,10 @@ learn_kwargs = {
# 'trpo_mpi': lambda e, p: trpo_mpi.learn(policy_fn=p(env=e), env=e, max_timesteps=30000, timesteps_per_batch=100, cg_iters=10, gamma=0.9, lam=1.0, max_kl=0.001)
}
alg_list = learn_kwargs.keys()
rnn_list = ['lstm']
@pytest.mark.slow
@pytest.mark.parametrize("alg", alg_list)
@pytest.mark.parametrize("rnn", rnn_list)
@@ -33,8 +33,10 @@ def test_fixed_sequence(alg, rnn):
kwargs = learn_kwargs[alg]
kwargs.update(common_kwargs)
episode_len = 5
env_fn = lambda: FixedSequenceEnv(10, episode_len=episode_len)
if alg == 'ppo2' and rnn.endswith('lstm'):
rnn = 'ppo_' + rnn
env_fn = lambda: FixedSequenceEnv(n_actions=10, episode_len=5)
learn = lambda e: get_learn_function(alg)(
env=e,
network=rnn,
@@ -46,6 +48,3 @@ def test_fixed_sequence(alg, rnn):
if __name__ == '__main__':
test_fixed_sequence('ppo2', 'lstm')

View File

@@ -41,7 +41,7 @@ def test_mnist(alg):
learn = get_learn_function(alg)
learn_fn = lambda e: learn(env=e, **learn_kwargs)
env_fn = lambda: MnistEnv(seed=0, episode_len=100)
env_fn = lambda: MnistEnv(episode_len=100)
simple_test(env_fn, learn_fn, 0.6)

View File

@@ -1,17 +1,16 @@
import os
import gym
import tempfile
import pytest
import tensorflow as tf
import numpy as np
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.run import get_learn_function
from baselines.common.tf_util import make_session, get_session
from functools import partial
import gym
import numpy as np
import pytest
import tensorflow as tf
from baselines.common.tests.envs.mnist_env import MnistEnv
from baselines.common.tf_util import make_session, get_session
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.run import get_learn_function
learn_kwargs = {
'deepq': {},
@@ -37,22 +36,29 @@ def test_serialization(learn_fn, network_fn):
Test if the trained model can be serialized
'''
_network_kwargs = network_kwargs[network_fn]
if network_fn.endswith('lstm') and learn_fn in ['acer', 'acktr', 'trpo_mpi', 'deepq']:
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
elif network_fn.endswith('lstm') and learn_fn == 'ppo2':
network_fn = 'ppo_' + network_fn
env = DummyVecEnv([lambda: MnistEnv(10, episode_len=100)])
def make_env():
env = MnistEnv(episode_len=100)
env.seed(10)
return env
env = DummyVecEnv([make_env])
ob = env.reset().copy()
learn = get_learn_function(learn_fn)
kwargs = {}
kwargs.update(network_kwargs[network_fn])
kwargs.update(_network_kwargs)
kwargs.update(learn_kwargs[learn_fn])
learn = partial(learn, env=env, network=network_fn, seed=0, **kwargs)
with tempfile.TemporaryDirectory() as td:
@@ -71,7 +77,7 @@ def test_serialization(learn_fn, network_fn):
for k, v in variables_dict1.items():
np.testing.assert_allclose(v, variables_dict2[k], atol=0.01,
err_msg='saved and loaded variable {} value mismatch'.format(k))
err_msg='saved and loaded variable {} value mismatch'.format(k))
np.testing.assert_allclose(mean1, mean2, atol=0.5)
np.testing.assert_allclose(std1, std2, atol=0.5)
@@ -85,15 +91,15 @@ def test_coexistence(learn_fn, network_fn):
'''
if learn_fn == 'deepq':
# TODO enable multiple DQN models to be useable at the same time
# github issue https://github.com/openai/baselines/issues/656
return
# TODO enable multiple DQN models to be useable at the same time
# github issue https://github.com/openai/baselines/issues/656
return
if network_fn.endswith('lstm') and learn_fn in ['acktr', 'trpo_mpi', 'deepq']:
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
# TODO make acktr work with recurrent policies
# and test
# github issue: https://github.com/openai/baselines/issues/660
return
env = DummyVecEnv([lambda: gym.make('CartPole-v0')])
learn = get_learn_function(learn_fn)
@@ -102,7 +108,7 @@ def test_coexistence(learn_fn, network_fn):
kwargs.update(network_kwargs[network_fn])
kwargs.update(learn_kwargs[learn_fn])
learn = partial(learn, env=env, network=network_fn, total_timesteps=0, **kwargs)
learn = partial(learn, env=env, network=network_fn, total_timesteps=0, **kwargs)
make_session(make_default=True, graph=tf.Graph())
model1 = learn(seed=1)
make_session(make_default=True, graph=tf.Graph())
@@ -112,7 +118,6 @@ def test_coexistence(learn_fn, network_fn):
model2.step(env.observation_space.sample())
def _serialize_variables():
sess = get_session()
variables = tf.trainable_variables()
@@ -131,4 +136,3 @@ def _get_action_stats(model, ob):
std = np.std(actions, axis=0)
return mean, std

View File

@@ -0,0 +1,36 @@
import os
import sys
import subprocess
import cloudpickle
import base64
import pytest
try:
from mpi4py import MPI
except ImportError:
MPI = None
def with_mpi(nproc=2, timeout=30, skip_if_no_mpi=True):
def outer_thunk(fn):
def thunk(*args, **kwargs):
serialized_fn = base64.b64encode(cloudpickle.dumps(lambda: fn(*args, **kwargs)))
subprocess.check_call([
'mpiexec','-n', str(nproc),
sys.executable,
'-m', 'baselines.common.tests.test_with_mpi',
serialized_fn
], env=os.environ, timeout=timeout)
if skip_if_no_mpi:
return pytest.mark.skipif(MPI is None, reason="MPI not present")(thunk)
else:
return thunk
return outer_thunk
if __name__ == '__main__':
if len(sys.argv) > 1:
fn = cloudpickle.loads(base64.b64decode(sys.argv[1]))
assert callable(fn)
fn()

View File

@@ -1,56 +1,44 @@
import tensorflow as tf
import numpy as np
from gym.spaces import np_random
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
N_TRIALS = 10000
N_EPISODES = 100
def simple_test(env_fn, learn_fn, min_reward_fraction, n_trials=N_TRIALS):
def seeded_env_fn():
env = env_fn()
env.seed(0)
return env
np.random.seed(0)
np_random.seed(0)
env = DummyVecEnv([env_fn])
env = DummyVecEnv([seeded_env_fn])
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
tf.set_random_seed(0)
model = learn_fn(env)
sum_rew = 0
done = True
for i in range(n_trials):
if done:
obs = env.reset()
state = model.initial_state
if state is not None:
a, v, state, _ = model.step(obs, S=state, M=[False])
else:
a, v, _, _ = model.step(obs)
obs, rew, done, _ = env.step(a)
sum_rew += float(rew)
print("Reward in {} trials is {}".format(n_trials, sum_rew))
assert sum_rew > min_reward_fraction * n_trials, \
'sum of rewards {} is less than {} of the total number of trials {}'.format(sum_rew, min_reward_fraction, n_trials)
def reward_per_episode_test(env_fn, learn_fn, min_avg_reward, n_trials=N_EPISODES):
env = DummyVecEnv([env_fn])
with tf.Graph().as_default(), tf.Session(config=tf.ConfigProto(allow_soft_placement=True)).as_default():
model = learn_fn(env)
N_TRIALS = 100
observations, actions, rewards = rollout(env, model, N_TRIALS)
rewards = [sum(r) for r in rewards]
avg_rew = sum(rewards) / N_TRIALS
print("Average reward in {} episodes is {}".format(n_trials, avg_rew))
assert avg_rew > min_avg_reward, \
@@ -60,14 +48,12 @@ def rollout(env, model, n_trials):
rewards = []
actions = []
observations = []
for i in range(n_trials):
obs = env.reset()
state = model.initial_state if hasattr(model, 'initial_state') else None
episode_rew = []
episode_actions = []
episode_obs = []
while True:
if state is not None:
a, v, state, _ = model.step(obs, S=state, M=[False])
@@ -75,17 +61,13 @@ def rollout(env, model, n_trials):
a,v, _, _ = model.step(obs)
obs, rew, done, _ = env.step(a)
episode_rew.append(rew)
episode_actions.append(a)
episode_obs.append(obs)
if done:
break
rewards.append(episode_rew)
actions.append(episode_actions)
observations.append(episode_obs)
return observations, actions, rewards

View File

@@ -1,4 +1,3 @@
import joblib
import numpy as np
import tensorflow as tf # pylint: ignore-module
import copy
@@ -306,12 +305,17 @@ def display_var_info(vars):
logger.info("Total model parameters: %0.2f million" % (count_params*1e-6))
def get_available_gpus():
# recipe from here:
# https://stackoverflow.com/questions/38559755/how-to-get-current-available-gpus-in-tensorflow?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa
def get_available_gpus(session_config=None):
# based on recipe from https://stackoverflow.com/a/38580201
# Unless we allocate a session here, subsequent attempts to create one
# will ignore our custom config (in particular, allow_growth=True will have
# no effect).
if session_config is None:
session_config = get_session()._config
from tensorflow.python.client import device_lib
local_device_protos = device_lib.list_local_devices()
local_device_protos = device_lib.list_local_devices(session_config)
return [x.name for x in local_device_protos if x.device_type == 'GPU']
# ================================================================
@@ -339,6 +343,7 @@ def save_state(fname, sess=None):
# TODO: ensure there is no subtle differences and remove one
def save_variables(save_path, variables=None, sess=None):
import joblib
sess = sess or get_session()
variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
@@ -350,6 +355,7 @@ def save_variables(save_path, variables=None, sess=None):
joblib.dump(save_dict, save_path)
def load_variables(load_path, variables=None, sess=None):
import joblib
sess = sess or get_session()
variables = variables or tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)

View File

@@ -1,185 +1,10 @@
from abc import ABC, abstractmethod
from baselines.common.tile_images import tile_images
from .vec_env import AlreadySteppingError, NotSteppingError, VecEnv, VecEnvWrapper, VecEnvObservationWrapper, CloudpickleWrapper
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from .vec_frame_stack import VecFrameStack
from .vec_monitor import VecMonitor
from .vec_normalize import VecNormalize
from .vec_remove_dict_obs import VecExtractDictObs
class AlreadySteppingError(Exception):
"""
Raised when an asynchronous step is running while
step_async() is called again.
"""
def __init__(self):
msg = 'already running an async step'
Exception.__init__(self, msg)
class NotSteppingError(Exception):
"""
Raised when an asynchronous step is not running but
step_wait() is called.
"""
def __init__(self):
msg = 'not running an async step'
Exception.__init__(self, msg)
class VecEnv(ABC):
"""
An abstract asynchronous, vectorized environment.
Used to batch data from multiple copies of an environment, so that
each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
"""
closed = False
viewer = None
metadata = {
'render.modes': ['human', 'rgb_array']
}
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space
@abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass
@abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass
@abstractmethod
def step_wait(self):
"""
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
"""
pass
def close_extras(self):
"""
Clean up the extra resources, beyond what's in this base class.
Only runs when not self.closed.
"""
pass
def close(self):
if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras()
self.closed = True
def step(self, actions):
"""
Step the environments synchronously.
This is available for backwards compatibility.
"""
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
def get_images(self):
"""
Return RGB images from each environment
"""
raise NotImplementedError
@property
def unwrapped(self):
if isinstance(self, VecEnvWrapper):
return self.venv.unwrapped
else:
return self
def get_viewer(self):
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
class VecEnvWrapper(VecEnv):
"""
An environment wrapper that applies to an entire batch
of environments at once.
"""
def __init__(self, venv, observation_space=None, action_space=None):
self.venv = venv
VecEnv.__init__(self,
num_envs=venv.num_envs,
observation_space=observation_space or venv.observation_space,
action_space=action_space or venv.action_space)
def step_async(self, actions):
self.venv.step_async(actions)
@abstractmethod
def reset(self):
pass
@abstractmethod
def step_wait(self):
pass
def close(self):
return self.venv.close()
def render(self, mode='human'):
return self.venv.render(mode=mode)
def get_images(self):
return self.venv.get_images()
class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
"""
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)
__all__ = ['AlreadySteppingError', 'NotSteppingError', 'VecEnv', 'VecEnvWrapper', 'VecEnvObservationWrapper', 'CloudpickleWrapper', 'DummyVecEnv', 'ShmemVecEnv', 'SubprocVecEnv', 'VecFrameStack', 'VecMonitor', 'VecNormalize', 'VecExtractDictObs']

View File

@@ -1,6 +1,5 @@
import numpy as np
from gym import spaces
from . import VecEnv
from .vec_env import VecEnv
from .util import copy_obs_dict, dict_to_obs, obs_space_info
class DummyVecEnv(VecEnv):
@@ -27,7 +26,7 @@ class DummyVecEnv(VecEnv):
self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32)
self.buf_infos = [{} for _ in range(self.num_envs)]
self.actions = None
self.specs = [e.spec for e in self.envs]
self.spec = self.envs[0].spec
def step_async(self, actions):
listify = True
@@ -46,8 +45,8 @@ class DummyVecEnv(VecEnv):
def step_wait(self):
for e in range(self.num_envs):
action = self.actions[e]
if isinstance(self.envs[e].action_space, spaces.Discrete):
action = int(action)
# if isinstance(self.envs[e].action_space, spaces.Discrete):
# action = int(action)
obs, self.buf_rews[e], self.buf_dones[e], self.buf_infos[e] = self.envs[e].step(action)
if self.buf_dones[e]:

View File

@@ -2,9 +2,9 @@
An interface for asynchronous vectorized environments.
"""
from multiprocessing import Pipe, Array, Process
import multiprocessing as mp
import numpy as np
from . import VecEnv, CloudpickleWrapper
from .vec_env import VecEnv, CloudpickleWrapper, clear_mpi_env_vars
import ctypes
from baselines import logger
@@ -22,11 +22,12 @@ class ShmemVecEnv(VecEnv):
Optimized version of SubprocVecEnv that uses shared variables to communicate observations.
"""
def __init__(self, env_fns, spaces=None):
def __init__(self, env_fns, spaces=None, context='spawn'):
"""
If you don't specify observation_space, we'll have to create a dummy
environment to get it.
"""
ctx = mp.get_context(context)
if spaces:
observation_space, action_space = spaces
else:
@@ -39,22 +40,22 @@ class ShmemVecEnv(VecEnv):
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
self.obs_keys, self.obs_shapes, self.obs_dtypes = obs_space_info(observation_space)
self.obs_bufs = [
{k: Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys}
{k: ctx.Array(_NP_TO_CT[self.obs_dtypes[k].type], int(np.prod(self.obs_shapes[k]))) for k in self.obs_keys}
for _ in env_fns]
self.parent_pipes = []
self.procs = []
for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
wrapped_fn = CloudpickleWrapper(env_fn)
parent_pipe, child_pipe = Pipe()
proc = Process(target=_subproc_worker,
args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
proc.daemon = True
self.procs.append(proc)
self.parent_pipes.append(parent_pipe)
proc.start()
child_pipe.close()
with clear_mpi_env_vars():
for env_fn, obs_buf in zip(env_fns, self.obs_bufs):
wrapped_fn = CloudpickleWrapper(env_fn)
parent_pipe, child_pipe = ctx.Pipe()
proc = ctx.Process(target=_subproc_worker,
args=(child_pipe, parent_pipe, wrapped_fn, obs_buf, self.obs_shapes, self.obs_dtypes, self.obs_keys))
proc.daemon = True
self.procs.append(proc)
self.parent_pipes.append(parent_pipe)
proc.start()
child_pipe.close()
self.waiting_step = False
self.specs = [f().spec for f in env_fns]
self.viewer = None
def reset(self):

View File

@@ -1,6 +1,8 @@
import multiprocessing as mp
import numpy as np
from multiprocessing import Process, Pipe
from . import VecEnv, CloudpickleWrapper
from .vec_env import VecEnv, CloudpickleWrapper, clear_mpi_env_vars
def worker(remote, parent_remote, env_fn_wrapper):
parent_remote.close()
@@ -21,8 +23,8 @@ def worker(remote, parent_remote, env_fn_wrapper):
elif cmd == 'close':
remote.close()
break
elif cmd == 'get_spaces':
remote.send((env.observation_space, env.action_space))
elif cmd == 'get_spaces_spec':
remote.send((env.observation_space, env.action_space, env.spec))
else:
raise NotImplementedError
except KeyboardInterrupt:
@@ -36,7 +38,7 @@ class SubprocVecEnv(VecEnv):
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
Recommended to use when num_envs > 1 and step() can be a bottleneck.
"""
def __init__(self, env_fns, spaces=None):
def __init__(self, env_fns, spaces=None, context='spawn'):
"""
Arguments:
@@ -45,19 +47,20 @@ class SubprocVecEnv(VecEnv):
self.waiting = False
self.closed = False
nenvs = len(env_fns)
self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(nenvs)])
self.ps = [Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
ctx = mp.get_context(context)
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(nenvs)])
self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
p.start()
with clear_mpi_env_vars():
p.start()
for remote in self.work_remotes:
remote.close()
self.remotes[0].send(('get_spaces', None))
observation_space, action_space = self.remotes[0].recv()
self.remotes[0].send(('get_spaces_spec', None))
observation_space, action_space, self.spec = self.remotes[0].recv()
self.viewer = None
self.specs = [f().spec for f in env_fns]
VecEnv.__init__(self, len(env_fns), observation_space, action_space)
def step_async(self, actions):
@@ -99,16 +102,16 @@ class SubprocVecEnv(VecEnv):
def _assert_not_closed(self):
assert not self.closed, "Trying to operate on a SubprocVecEnv after calling close()"
def __del__(self):
if not self.closed:
self.close()
def _flatten_obs(obs):
assert isinstance(obs, list) or isinstance(obs, tuple)
assert isinstance(obs, (list, tuple))
assert len(obs) > 0
if isinstance(obs[0], dict):
import collections
assert isinstance(obs, collections.OrderedDict)
keys = obs[0].keys()
return {k: np.stack([o[k] for o in obs]) for k in keys}
else:
return np.stack(obs)

View File

@@ -8,39 +8,40 @@ import pytest
from .dummy_vec_env import DummyVecEnv
from .shmem_vec_env import ShmemVecEnv
from .subproc_vec_env import SubprocVecEnv
from baselines.common.tests.test_with_mpi import with_mpi
def assert_envs_equal(env1, env2, num_steps):
def assert_venvs_equal(venv1, venv2, num_steps):
"""
Compare two environments over num_steps steps and make sure
that the observations produced by each are the same when given
the same actions.
"""
assert env1.num_envs == env2.num_envs
assert env1.action_space.shape == env2.action_space.shape
assert env1.action_space.dtype == env2.action_space.dtype
joint_shape = (env1.num_envs,) + env1.action_space.shape
assert venv1.num_envs == venv2.num_envs
assert venv1.observation_space.shape == venv2.observation_space.shape
assert venv1.observation_space.dtype == venv2.observation_space.dtype
assert venv1.action_space.shape == venv2.action_space.shape
assert venv1.action_space.dtype == venv2.action_space.dtype
try:
obs1, obs2 = env1.reset(), env2.reset()
obs1, obs2 = venv1.reset(), venv2.reset()
assert np.array(obs1).shape == np.array(obs2).shape
assert np.array(obs1).shape == joint_shape
assert np.array(obs1).shape == (venv1.num_envs,) + venv1.observation_space.shape
assert np.allclose(obs1, obs2)
np.random.seed(1337)
venv1.action_space.seed(1337)
for _ in range(num_steps):
actions = np.array(np.random.randint(0, 0x100, size=joint_shape),
dtype=env1.action_space.dtype)
for env in [env1, env2]:
env.step_async(actions)
outs1 = env1.step_wait()
outs2 = env2.step_wait()
actions = np.array([venv1.action_space.sample() for _ in range(venv1.num_envs)])
for venv in [venv1, venv2]:
venv.step_async(actions)
outs1 = venv1.step_wait()
outs2 = venv2.step_wait()
for out1, out2 in zip(outs1[:3], outs2[:3]):
assert np.array(out1).shape == np.array(out2).shape
assert np.allclose(out1, out2)
assert list(outs1[3]) == list(outs2[3])
finally:
env1.close()
env2.close()
venv1.close()
venv2.close()
@pytest.mark.parametrize('klass', (ShmemVecEnv, SubprocVecEnv))
@@ -63,7 +64,7 @@ def test_vec_env(klass, dtype): # pylint: disable=R0914
fns = [make_fn(i) for i in range(num_envs)]
env1 = DummyVecEnv(fns)
env2 = klass(fns)
assert_envs_equal(env1, env2, num_steps=num_steps)
assert_venvs_equal(env1, env2, num_steps=num_steps)
class SimpleEnv(gym.Env):
@@ -99,3 +100,15 @@ class SimpleEnv(gym.Env):
def render(self, mode=None):
raise NotImplementedError
@with_mpi()
def test_mpi_with_subprocvecenv():
shape = (2,3,4)
nenv = 1
venv = SubprocVecEnv([lambda: SimpleEnv(0, shape, 'float32')] * nenv)
ob = venv.reset()
venv.close()
assert ob.shape == (nenv,) + shape

View File

@@ -0,0 +1,219 @@
import contextlib
import os
from abc import ABC, abstractmethod
from baselines.common.tile_images import tile_images
class AlreadySteppingError(Exception):
"""
Raised when an asynchronous step is running while
step_async() is called again.
"""
def __init__(self):
msg = 'already running an async step'
Exception.__init__(self, msg)
class NotSteppingError(Exception):
"""
Raised when an asynchronous step is not running but
step_wait() is called.
"""
def __init__(self):
msg = 'not running an async step'
Exception.__init__(self, msg)
class VecEnv(ABC):
"""
An abstract asynchronous, vectorized environment.
Used to batch data from multiple copies of an environment, so that
each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
"""
closed = False
viewer = None
metadata = {
'render.modes': ['human', 'rgb_array']
}
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space
@abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass
@abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass
@abstractmethod
def step_wait(self):
"""
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
"""
pass
def close_extras(self):
"""
Clean up the extra resources, beyond what's in this base class.
Only runs when not self.closed.
"""
pass
def close(self):
if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras()
self.closed = True
def step(self, actions):
"""
Step the environments synchronously.
This is available for backwards compatibility.
"""
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
def get_images(self):
"""
Return RGB images from each environment
"""
raise NotImplementedError
@property
def unwrapped(self):
if isinstance(self, VecEnvWrapper):
return self.venv.unwrapped
else:
return self
def get_viewer(self):
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.SimpleImageViewer()
return self.viewer
class VecEnvWrapper(VecEnv):
"""
An environment wrapper that applies to an entire batch
of environments at once.
"""
def __init__(self, venv, observation_space=None, action_space=None):
self.venv = venv
VecEnv.__init__(self,
num_envs=venv.num_envs,
observation_space=observation_space or venv.observation_space,
action_space=action_space or venv.action_space)
def step_async(self, actions):
self.venv.step_async(actions)
@abstractmethod
def reset(self):
pass
@abstractmethod
def step_wait(self):
pass
def close(self):
return self.venv.close()
def render(self, mode='human'):
return self.venv.render(mode=mode)
def get_images(self):
return self.venv.get_images()
class VecEnvObservationWrapper(VecEnvWrapper):
@abstractmethod
def process(self, obs):
pass
def reset(self):
obs = self.venv.reset()
return self.process(obs)
def step_wait(self):
obs, rews, dones, infos = self.venv.step_wait()
return self.process(obs), rews, dones, infos
class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
"""
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)
@contextlib.contextmanager
def clear_mpi_env_vars():
"""
from mpi4py import MPI will call MPI_Init by default. If the child process has MPI environment variables, MPI will think that the child process is an MPI process just like the parent and do bad things such as hang.
This context manager is a hacky way to clear those environment variables temporarily such as when we are starting multiprocessing
Processes.
"""
removed_environment = {}
for k, v in list(os.environ.items()):
for prefix in ['OMPI_', 'PMI_']:
if k.startswith(prefix):
removed_environment[k] = v
del os.environ[k]
try:
yield
finally:
os.environ.update(removed_environment)

View File

@@ -1,4 +1,4 @@
from . import VecEnvWrapper
from .vec_env import VecEnvWrapper
import numpy as np
from gym import spaces

View File

@@ -2,15 +2,23 @@ from . import VecEnvWrapper
from baselines.bench.monitor import ResultsWriter
import numpy as np
import time
from collections import deque
class VecMonitor(VecEnvWrapper):
def __init__(self, venv, filename=None):
def __init__(self, venv, filename=None, keep_buf=0):
VecEnvWrapper.__init__(self, venv)
self.eprets = None
self.eplens = None
self.epcount = 0
self.tstart = time.time()
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart})
if filename:
self.results_writer = ResultsWriter(filename, header={'t_start': self.tstart})
else:
self.results_writer = None
self.keep_buf = keep_buf
if self.keep_buf:
self.epret_buf = deque([], maxlen=keep_buf)
self.eplen_buf = deque([], maxlen=keep_buf)
def reset(self):
obs = self.venv.reset()
@@ -28,10 +36,14 @@ class VecMonitor(VecEnvWrapper):
if done:
epinfo = {'r': ret, 'l': eplen, 't': round(time.time() - self.tstart, 6)}
info['episode'] = epinfo
if self.keep_buf:
self.epret_buf.append(ret)
self.eplen_buf.append(eplen)
self.epcount += 1
self.eprets[i] = 0
self.eplens[i] = 0
self.results_writer.write_row(epinfo)
if self.results_writer:
self.results_writer.write_row(epinfo)
newinfos.append(info)
return obs, rews, dones, newinfos

View File

@@ -0,0 +1,11 @@
from .vec_env import VecEnvObservationWrapper
class VecExtractDictObs(VecEnvObservationWrapper):
def __init__(self, venv, key):
self.key = key
super().__init__(venv=venv,
observation_space=venv.observation_space.spaces[self.key])
def process(self, obs):
return obs[self.key]

View File

@@ -0,0 +1,19 @@
import gym
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
def step(self, ac):
observation, reward, done, info = self.env.step(ac)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
done = True
info['TimeLimit.truncated'] = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)

View File

@@ -17,7 +17,7 @@ except ImportError:
def normalize(x, stats):
if stats is None:
return x
return (x - stats.mean) / stats.std
return (x - stats.mean) / (stats.std + 1e-8)
def denormalize(x, stats):

View File

@@ -1,7 +1,10 @@
from baselines.run import main as M
from multiprocessing import Process
import baselines.run
def _run(argstr):
M(('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' '))
p = Process(target=baselines.run.main, args=('--alg=ddpg --env=Pendulum-v0 --num_timesteps=0 ' + argstr).split(' '))
p.start()
p.join()
def test_popart():
_run('--normalize_returns=True --popart=True')

View File

@@ -23,7 +23,7 @@ def model(inpt, num_actions, scope, reuse=False):
if __name__ == '__main__':
with U.make_session(8):
with U.make_session(num_cpu=8):
# Create the environment
env = gym.make("CartPole-v0")
# Create all the functions necessary to train the model

View File

@@ -20,7 +20,7 @@ class TfInput(object):
"""
raise NotImplementedError
def make_feed_dict(data):
def make_feed_dict(self, data):
"""Given data input it to the placeholder(s)."""
raise NotImplementedError

View File

@@ -12,13 +12,13 @@ Download the expert data into `./data`, [download link](https://drive.google.com
### Step 2: Run GAIL
Run with single thread:
Run with single rank:
```bash
python -m baselines.gail.run_mujoco
```
Run with multiple threads:
Run with multiple ranks:
```bash
mpirun -np 16 python -m baselines.gail.run_mujoco

View File

@@ -66,7 +66,7 @@ class TransitionClassifier(object):
with tf.variable_scope("obfilter"):
self.obs_rms = RunningMeanStd(shape=self.observation_shape)
obs = (obs_ph - self.obs_rms.mean / self.obs_rms.std)
obs = (obs_ph - self.obs_rms.mean) / self.obs_rms.std
_input = tf.concat([obs, acs_ph], axis=1) # concatenate the two input -> form a transition
p_h1 = tf.contrib.layers.fully_connected(_input, self.hidden_size, activation_fn=tf.nn.tanh)
p_h2 = tf.contrib.layers.fully_connected(p_h1, self.hidden_size, activation_fn=tf.nn.tanh)

View File

@@ -50,8 +50,12 @@ class Mujoco_Dset(object):
# obs, acs: shape (N, L, ) + S where N = # episodes, L = episode length
# and S is the environment observation/action space.
# Flatten to (N * L, prod(S))
self.obs = np.reshape(obs, [-1, np.prod(obs.shape[2:])])
self.acs = np.reshape(acs, [-1, np.prod(acs.shape[2:])])
if len(obs.shape) > 2:
self.obs = np.reshape(obs, [-1, np.prod(obs.shape[2:])])
self.acs = np.reshape(acs, [-1, np.prod(acs.shape[2:])])
else:
self.obs = np.vstack(obs)
self.acs = np.vstack(acs)
self.rets = traj_data['ep_rets'][:traj_limitation]
self.avg_ret = sum(self.rets)/len(self.rets)

View File

@@ -108,7 +108,7 @@ def learn(*, network, env, total_timesteps,
# Prepare params.
params = config.DEFAULT_PARAMS
env_name = env.specs[0].id
env_name = env.spec.id
params['env_name'] = env_name
params['replay_strategy'] = replay_strategy
if env_name in config.DEFAULT_ENV_PARAMS:

View File

@@ -7,6 +7,7 @@ import time
import datetime
import tempfile
from collections import defaultdict
from contextlib import contextmanager
DEBUG = 10
INFO = 20
@@ -68,7 +69,8 @@ class HumanOutputFormat(KVWriter, SeqWriter):
self.file.flush()
def _truncate(self, s):
return s[:20] + '...' if len(s) > 23 else s
maxlen = 30
return s[:maxlen-3] + '...' if len(s) > maxlen else s
def writeseq(self, seq):
seq = list(seq)
@@ -195,13 +197,13 @@ def logkv(key, val):
Call this once for each diagnostic quantity, each iteration
If called many times, last value will be used.
"""
Logger.CURRENT.logkv(key, val)
get_current().logkv(key, val)
def logkv_mean(key, val):
"""
The same as logkv(), but if called many times, values averaged.
"""
Logger.CURRENT.logkv_mean(key, val)
get_current().logkv_mean(key, val)
def logkvs(d):
"""
@@ -213,21 +215,18 @@ def logkvs(d):
def dumpkvs():
"""
Write all of the diagnostics from the current iteration
level: int. (see logger.py docs) If the global logger level is higher than
the level argument here, don't print to stdout.
"""
Logger.CURRENT.dumpkvs()
return get_current().dumpkvs()
def getkvs():
return Logger.CURRENT.name2val
return get_current().name2val
def log(*args, level=INFO):
"""
Write the sequence of args, with no separators, to the console and output files (if you've configured an output file).
"""
Logger.CURRENT.log(*args, level=level)
get_current().log(*args, level=level)
def debug(*args):
log(*args, level=DEBUG)
@@ -246,30 +245,29 @@ def set_level(level):
"""
Set logging threshold on current logger.
"""
Logger.CURRENT.set_level(level)
get_current().set_level(level)
def set_comm(comm):
get_current().set_comm(comm)
def get_dir():
"""
Get directory that log files are being written to.
will be None if there is no output directory (i.e., if you didn't call start)
"""
return Logger.CURRENT.get_dir()
return get_current().get_dir()
record_tabular = logkv
dump_tabular = dumpkvs
class ProfileKV:
"""
Usage:
with logger.ProfileKV("interesting_scope"):
code
"""
def __init__(self, n):
self.n = "wait_" + n
def __enter__(self):
self.t1 = time.time()
def __exit__(self ,type, value, traceback):
Logger.CURRENT.name2val[self.n] += time.time() - self.t1
@contextmanager
def profile_kv(scopename):
logkey = 'wait_' + scopename
tstart = time.time()
try:
yield
finally:
get_current().name2val[logkey] += time.time() - tstart
def profile(n):
"""
@@ -279,7 +277,7 @@ def profile(n):
"""
def decorator_with_name(func):
def func_wrapper(*args, **kwargs):
with ProfileKV(n):
with profile_kv(n):
return func(*args, **kwargs)
return func_wrapper
return decorator_with_name
@@ -289,17 +287,25 @@ def profile(n):
# Backend
# ================================================================
def get_current():
if Logger.CURRENT is None:
_configure_default_logger()
return Logger.CURRENT
class Logger(object):
DEFAULT = None # A logger with no output files. (See right below class definition)
# So that you can still log to the terminal without setting up any output files
CURRENT = None # Current logger being used by the free functions above
def __init__(self, dir, output_formats):
def __init__(self, dir, output_formats, comm=None):
self.name2val = defaultdict(float) # values this iteration
self.name2cnt = defaultdict(int)
self.level = INFO
self.dir = dir
self.output_formats = output_formats
self.comm = comm
# Logging API, forwarded
# ----------------------------------------
@@ -307,20 +313,27 @@ class Logger(object):
self.name2val[key] = val
def logkv_mean(self, key, val):
if val is None:
self.name2val[key] = None
return
oldval, cnt = self.name2val[key], self.name2cnt[key]
self.name2val[key] = oldval*cnt/(cnt+1) + val/(cnt+1)
self.name2cnt[key] = cnt + 1
def dumpkvs(self):
if self.level == DISABLED: return
if self.comm is None:
d = self.name2val
else:
from baselines.common import mpi_util
d = mpi_util.mpi_weighted_mean(self.comm,
{name : (val, self.name2cnt.get(name, 1))
for (name, val) in self.name2val.items()})
if self.comm.rank != 0:
d['dummy'] = 1 # so we don't get a warning about empty dict
out = d.copy() # Return the dict for unit testing purposes
for fmt in self.output_formats:
if isinstance(fmt, KVWriter):
fmt.writekvs(self.name2val)
fmt.writekvs(d)
self.name2val.clear()
self.name2cnt.clear()
return out
def log(self, *args, level=INFO):
if self.level <= level:
@@ -331,6 +344,9 @@ class Logger(object):
def set_level(self, level):
self.level = level
def set_comm(self, comm):
self.comm = comm
def get_dir(self):
return self.dir
@@ -345,7 +361,10 @@ class Logger(object):
if isinstance(fmt, SeqWriter):
fmt.writeseq(map(str, args))
def configure(dir=None, format_strs=None):
def configure(dir=None, format_strs=None, comm=None):
"""
If comm is provided, average all numerical stats across that comm
"""
if dir is None:
dir = os.getenv('OPENAI_LOGDIR')
if dir is None:
@@ -372,15 +391,11 @@ def configure(dir=None, format_strs=None):
format_strs = filter(None, format_strs)
output_formats = [make_output_format(f, dir, log_suffix) for f in format_strs]
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats)
Logger.CURRENT = Logger(dir=dir, output_formats=output_formats, comm=comm)
log('Logging to %s'%dir)
def _configure_default_logger():
format_strs = None
# keep the old default of only writing to stdout
if 'OPENAI_LOG_FORMAT' not in os.environ:
format_strs = ['stdout']
configure(format_strs=format_strs)
configure()
Logger.DEFAULT = Logger.CURRENT
def reset():
@@ -389,17 +404,15 @@ def reset():
Logger.CURRENT = Logger.DEFAULT
log('Reset logger')
class scoped_configure(object):
def __init__(self, dir=None, format_strs=None):
self.dir = dir
self.format_strs = format_strs
self.prevlogger = None
def __enter__(self):
self.prevlogger = Logger.CURRENT
configure(dir=self.dir, format_strs=self.format_strs)
def __exit__(self, *args):
@contextmanager
def scoped_configure(dir=None, format_strs=None, comm=None):
prevlogger = Logger.CURRENT
configure(dir=dir, format_strs=format_strs, comm=comm)
try:
yield
finally:
Logger.CURRENT.close()
Logger.CURRENT = self.prevlogger
Logger.CURRENT = prevlogger
# ================================================================
@@ -423,7 +436,7 @@ def _demo():
logkv_mean("b", -44.4)
logkv("a", 5.5)
dumpkvs()
info("^^^ should see b = 33.3")
info("^^^ should see b = -33.3")
logkv("b", -2.5)
dumpkvs()
@@ -456,7 +469,6 @@ def read_tb(path):
import pandas
import numpy as np
from glob import glob
from collections import defaultdict
import tensorflow as tf
if osp.isdir(path):
fnames = glob(osp.join(path, "events.*"))
@@ -482,8 +494,5 @@ def read_tb(path):
data[step-1, colidx] = value
return pandas.DataFrame(data, columns=tags)
# configure the default logger on import
_configure_default_logger()
if __name__ == "__main__":
_demo()

View File

@@ -97,7 +97,6 @@ def learn(env, policy_fn, *,
ret = tf.placeholder(dtype=tf.float32, shape=[None]) # Empirical return
lrmult = tf.placeholder(name='lrmult', dtype=tf.float32, shape=[]) # learning rate multiplier, updated with schedule
clip_param = clip_param * lrmult # Annealed clipping parameter epsilon
ob = U.get_placeholder_cached(name="ob")
ac = pi.pdtype.sample_placeholder([None])
@@ -168,7 +167,7 @@ def learn(env, policy_fn, *,
ob, ac, atarg, tdlamret = seg["ob"], seg["ac"], seg["adv"], seg["tdlamret"]
vpredbefore = seg["vpred"] # predicted value function before udpate
atarg = (atarg - atarg.mean()) / atarg.std() # standardized advantage function estimate
d = Dataset(dict(ob=ob, ac=ac, atarg=atarg, vtarg=tdlamret), shuffle=not pi.recurrent)
d = Dataset(dict(ob=ob, ac=ac, atarg=atarg, vtarg=tdlamret), deterministic=pi.recurrent)
optim_batchsize = optim_batchsize or ob.shape[0]
if hasattr(pi, "ob_rms"): pi.ob_rms.update(ob) # update running mean/std for policy

View File

@@ -19,16 +19,17 @@ def train(num_timesteps, seed, model_path=None):
# these are good enough to make humanoid walk, but whether those are
# an absolute best or not is not certain
env = RewScale(env, 0.1)
logger.log("NOTE: reward will be scaled by a factor of 10 in logged stats. Check the monitor for unscaled reward.")
pi = pposgd_simple.learn(env, policy_fn,
max_timesteps=num_timesteps,
timesteps_per_actorbatch=2048,
clip_param=0.2, entcoeff=0.0,
clip_param=0.1, entcoeff=0.0,
optim_epochs=10,
optim_stepsize=3e-4,
optim_stepsize=1e-4,
optim_batchsize=64,
gamma=0.99,
lam=0.95,
schedule='linear',
schedule='constant',
)
env.close()
if model_path:
@@ -47,7 +48,7 @@ def main():
logger.configure()
parser = mujoco_arg_parser()
parser.add_argument('--model-path', default=os.path.join(logger.get_dir(), 'humanoid_policy'))
parser.set_defaults(num_timesteps=int(2e7))
parser.set_defaults(num_timesteps=int(5e7))
args = parser.parse_args()
@@ -68,8 +69,5 @@ def main():
if done:
ob = env.reset()
if __name__ == '__main__':
main()

View File

@@ -3,6 +3,16 @@
- Original paper: https://arxiv.org/abs/1707.06347
- Baselines blog post: https://blog.openai.com/openai-baselines-ppo/
## Examples
- `python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4` runs the algorithm for 40M frames = 10M timesteps on an Atari Pong. See help (`-h`) for more options.
- `python -m baselines.run --alg=ppo2 --env=Ant-v2 --num_timesteps=1e6` runs the algorithm for 1M frames on a Mujoco Ant environment.
- also refer to the repo-wide [README.md](../../README.md#training-models)
### RNN networks
- `python -m baselines.run --alg=ppo2 --env=PongNoFrameskip-v4 --network=ppo_cnn_lstm` runs on an Atari Pong with
`ppo_cnn_lstm` network.
- `python -m baselines.run --alg=ppo2 --env=Ant-v2 --num_timesteps=1e6 --network=ppo_lstm --value_network=copy`
runs on a Mujoco Ant environment with `ppo_lstm` network whose value and policy networks are separated, but have
same structure.
## See Also
- refer to the repo-wide [README.md](../../README.md#training-models)

View File

@@ -0,0 +1 @@
from baselines.ppo2.layers import ppo_lstm, ppo_cnn_lstm, ppo_cnn_lnlstm # pylint: disable=unused-import # noqa: F401

View File

@@ -18,7 +18,7 @@ def atari():
lam=0.95, gamma=0.99, noptepochs=4, log_interval=1,
ent_coef=.01,
lr=lambda f : f * 2.5e-4,
cliprange=lambda f : f * 0.1,
cliprange=0.1,
)
def retro():

55
baselines/ppo2/layers.py Normal file
View File

@@ -0,0 +1,55 @@
import numpy as np
import tensorflow as tf
from baselines.a2c.utils import ortho_init, lstm, lnlstm
from baselines.common.models import register, nature_cnn
class RNN(object):
def __init__(self, func, memory_size=None):
self._func = func
self.memory_size = memory_size
def __call__(self, *args, **kwargs):
return self._func(*args, **kwargs)
@register("ppo_lstm")
def ppo_lstm(num_units=128, layer_norm=False):
def network_fn(input, mask, state):
input = tf.layers.flatten(input)
mask = tf.to_float(mask)
if layer_norm:
h, next_state = lnlstm([input], [mask[:, None]], state, scope='lnlstm', nh=num_units)
else:
h, next_state = lstm([input], [mask[:, None]], state, scope='lstm', nh=num_units)
h = h[0]
return h, next_state
return RNN(network_fn, memory_size=num_units * 2)
@register("ppo_cnn_lstm")
def ppo_cnn_lstm(num_units=128, layer_norm=False, **conv_kwargs):
def network_fn(input, mask, state):
mask = tf.to_float(mask)
initializer = ortho_init(np.sqrt(2))
h = nature_cnn(input, **conv_kwargs)
h = tf.layers.flatten(h)
h = tf.layers.dense(h, units=512, activation=tf.nn.relu, kernel_initializer=initializer)
if layer_norm:
h, next_state = lnlstm([h], [mask[:, None]], state, scope='lnlstm', nh=num_units)
else:
h, next_state = lstm([h], [mask[:, None]], state, scope='lstm', nh=num_units)
h = h[0]
return h, next_state
return RNN(network_fn, memory_size=num_units * 2)
@register("ppo_cnn_lnlstm")
def ppo_cnn_lnlstm(num_units=128, **conv_kwargs):
return ppo_cnn_lstm(num_units, layer_norm=True, **conv_kwargs)

View File

@@ -1,42 +1,47 @@
import tensorflow as tf
import numpy as np
import tensorflow as tf
from baselines.ppo2.model import Model
class MicrobatchedModel(Model):
"""
Model that does training one microbatch at a time - when gradient computation
on the entire minibatch causes some overflow
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size):
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size):
self.nmicrobatches = nbatch_train // microbatch_size
self.microbatch_size = microbatch_size
assert nbatch_train % microbatch_size == 0, 'microbatch_size ({}) should divide nbatch_train ({}) evenly'.format(microbatch_size, nbatch_train)
assert nbatch_train % microbatch_size == 0, 'microbatch_size ({}) should divide nbatch_train ({}) evenly'.format(
microbatch_size, nbatch_train)
super().__init__(
policy=policy,
ob_space=ob_space,
ac_space=ac_space,
nbatch_act=nbatch_act,
nbatch_train=microbatch_size,
nsteps=nsteps,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm)
policy=policy,
ob_space=ob_space,
ac_space=ac_space,
nbatch_act=nbatch_act,
nbatch_train=microbatch_size,
nsteps=nsteps,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm)
self.grads_ph = [tf.placeholder(dtype=g.dtype, shape=g.shape) for g in self.grads]
grads_ph_and_vars = list(zip(self.grads_ph, self.var))
self._apply_gradients_op = self.trainer.apply_gradients(grads_ph_and_vars)
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
assert states is None, "microbatches with recurrent models are not supported yet"
# Here we calculate advantage A(s,a) = R + yV(s') - V(s)
# Returns = R + yV(s')
advs = returns - values
def train(self,
lr,
cliprange,
observations,
advs,
returns,
actions,
values,
neglogpacs,
**_kwargs):
# Normalize the advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
@@ -44,19 +49,24 @@ class MicrobatchedModel(Model):
stats_vs = []
for microbatch_idx in range(self.nmicrobatches):
_sli = range(microbatch_idx * self.microbatch_size, (microbatch_idx+1) * self.microbatch_size)
_sli = range(microbatch_idx * self.microbatch_size, (microbatch_idx + 1) * self.microbatch_size)
td_map = {
self.train_model.X: obs[_sli],
self.A:actions[_sli],
self.ADV:advs[_sli],
self.R:returns[_sli],
self.CLIPRANGE:cliprange,
self.OLDNEGLOGPAC:neglogpacs[_sli],
self.OLDVPRED:values[_sli]
self.train_model.X: observations[_sli],
self.A: actions[_sli],
self.ADV: advs[_sli],
self.RETURNS: returns[_sli],
self.LR: lr,
self.CLIPRANGE: cliprange,
self.OLDNEGLOGPAC: neglogpacs[_sli],
self.VALUE_PREV: values[_sli],
}
sliced_kwargs = {key: _kwargs[key][_sli] for key in _kwargs}
td_map.update(self.train_model.feed_dict(**sliced_kwargs))
# Compute gradient on a microbatch (note that variables do not change here) ...
grad_v, stats_v = self.sess.run([self.grads, self.stats_list], td_map)
grad_v, stats_v = self.sess.run([self.grads, self.stats_list], td_map)
if microbatch_idx == 0:
sum_grad_v = grad_v
else:
@@ -71,6 +81,3 @@ class MicrobatchedModel(Model):
self.sess.run(self._apply_gradients_op, feed_dict)
# Return average of the stats
return np.mean(np.array(stats_vs), axis=0).tolist()

View File

@@ -1,8 +1,8 @@
import tensorflow as tf
import functools
import tensorflow as tf
from baselines.common.tf_util import get_session, save_variables, load_variables
from baselines.common.tf_util import initialize
try:
from baselines.common.mpi_adam_optimizer import MpiAdamOptimizer
@@ -11,6 +11,7 @@ try:
except ImportError:
MPI = None
class Model(object):
"""
We use this object to :
@@ -24,133 +25,157 @@ class Model(object):
save/load():
- Save load the model
"""
def __init__(self, *, policy, ob_space, ac_space, nbatch_act, nbatch_train,
nsteps, ent_coef, vf_coef, max_grad_norm, microbatch_size=None):
self.sess = sess = get_session()
nsteps, ent_coef, vf_coef, max_grad_norm,
name='ppo_model',
sess=None,
microbatch_size=None):
if sess is None:
sess = get_session()
self.sess = sess
self.name = name
with tf.variable_scope('ppo2_model', reuse=tf.AUTO_REUSE):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
act_model = policy(nbatch_act, 1, sess)
with tf.variable_scope(name) as scope:
self.scope = scope
with tf.variable_scope('models', reuse=tf.AUTO_REUSE):
with tf.name_scope('act_model'):
# CREATE OUR TWO MODELS
# act_model that is used for sampling
act_model = policy(nbatch_act, 1, sess)
# Train model for training
if microbatch_size is None:
train_model = policy(nbatch_train, nsteps, sess)
else:
train_model = policy(microbatch_size, nsteps, sess)
with tf.name_scope('train_model'):
# Train model for training
if microbatch_size is None:
train_model = policy(nbatch_train, nsteps, sess)
else:
train_model = policy(microbatch_size, nsteps, sess)
# CREATE THE PLACEHOLDERS
self.A = A = train_model.pdtype.sample_placeholder([None])
self.ADV = ADV = tf.placeholder(tf.float32, [None])
self.R = R = tf.placeholder(tf.float32, [None])
# Keep track of old actor
self.OLDNEGLOGPAC = OLDNEGLOGPAC = tf.placeholder(tf.float32, [None])
# Keep track of old critic
self.OLDVPRED = OLDVPRED = tf.placeholder(tf.float32, [None])
self.LR = LR = tf.placeholder(tf.float32, [])
# Cliprange
self.CLIPRANGE = CLIPRANGE = tf.placeholder(tf.float32, [])
with tf.variable_scope('losses'):
# CREATE THE PLACEHOLDERS
self.A = A = train_model.pdtype.sample_placeholder([None], name='action')
self.ADV = ADV = tf.placeholder(tf.float32, [None], name='advantage')
self.RETURNS = RETURNS = tf.placeholder(tf.float32, [None], name='reward')
self.VALUE_PREV = VALUE_PREV = tf.placeholder(tf.float32, [None], name='value_prev')
self.OLDNEGLOGPAC = OLDNEGLOGPAC = tf.placeholder(tf.float32, [None],
name='negative_log_p_action_old')
self.CLIPRANGE = CLIPRANGE = tf.placeholder(tf.float32, [], name='clip_range')
neglogpac = train_model.pd.neglogp(A)
with tf.name_scope("neglogpac"):
neglogpac = train_model.pd.neglogp(A)
# Calculate the entropy
# Entropy is used to improve exploration by limiting the premature convergence to suboptimal policy.
entropy = tf.reduce_mean(train_model.pd.entropy())
with tf.name_scope("entropy"):
# Calculate the entropy
# Entropy is used to improve exploration by limiting the premature convergence to suboptimal policy.
entropy = tf.reduce_mean(train_model.pd.entropy())
entropy_loss = (- ent_coef) * entropy
# CALCULATE THE LOSS
# Total loss = Policy gradient loss - entropy * entropy coefficient + Value coefficient * value loss
with tf.name_scope("value_loss"):
# CALCULATE THE LOSS
value = train_model.value
value_clipped = VALUE_PREV + tf.clip_by_value(value - VALUE_PREV, -CLIPRANGE, CLIPRANGE)
vf_losses1 = tf.squared_difference(value, RETURNS)
vf_losses2 = tf.squared_difference(value_clipped, RETURNS)
vf_loss = 0.5 * vf_coef * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
# Clip the value to reduce variability during Critic training
# Get the predicted value
vpred = train_model.vf
vpredclipped = OLDVPRED + tf.clip_by_value(train_model.vf - OLDVPRED, - CLIPRANGE, CLIPRANGE)
# Unclipped value
vf_losses1 = tf.square(vpred - R)
# Clipped value
vf_losses2 = tf.square(vpredclipped - R)
with tf.name_scope("policy_loss"):
# Calculate ratio (pi current policy / pi old policy)
ratio = tf.exp(OLDNEGLOGPAC - neglogpac)
pg_losses = -ADV * ratio
pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE)
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
with tf.name_scope("approxkl"):
approxkl = .5 * tf.reduce_mean(tf.squared_difference(neglogpac, OLDNEGLOGPAC))
# Calculate ratio (pi current policy / pi old policy)
ratio = tf.exp(OLDNEGLOGPAC - neglogpac)
with tf.name_scope("clip_fraction"):
clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE)))
# Defining Loss = - J is equivalent to max J
pg_losses = -ADV * ratio
with tf.name_scope("total_loss"):
loss = pg_loss + entropy_loss + vf_loss
pg_losses2 = -ADV * tf.clip_by_value(ratio, 1.0 - CLIPRANGE, 1.0 + CLIPRANGE)
with tf.variable_scope('optimizer'):
self.LR = LR = tf.placeholder(tf.float32, [], name='learning_rate')
# Final PG loss
pg_loss = tf.reduce_mean(tf.maximum(pg_losses, pg_losses2))
approxkl = .5 * tf.reduce_mean(tf.square(neglogpac - OLDNEGLOGPAC))
clipfrac = tf.reduce_mean(tf.to_float(tf.greater(tf.abs(ratio - 1.0), CLIPRANGE)))
# UPDATE THE PARAMETERS USING LOSS
# 1. Get the model parameters
params = tf.trainable_variables(self.scope.name)
# Total loss
loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef
# 2. Build our trainer
if MPI is not None:
self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
grads_and_var = self.trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var)
# UPDATE THE PARAMETERS USING LOSS
# 1. Get the model parameters
params = tf.trainable_variables('ppo2_model')
# 2. Build our trainer
if MPI is not None:
self.trainer = MpiAdamOptimizer(MPI.COMM_WORLD, learning_rate=LR, epsilon=1e-5)
else:
self.trainer = tf.train.AdamOptimizer(learning_rate=LR, epsilon=1e-5)
# 3. Calculate the gradients
grads_and_var = self.trainer.compute_gradients(loss, params)
grads, var = zip(*grads_and_var)
if max_grad_norm is not None:
# Clip the gradients (normalize)
grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm)
grads_and_var = list(zip(grads, var))
if max_grad_norm is not None:
# Clip the gradients (normalize)
grads, _grad_norm = tf.clip_by_global_norm(grads, max_grad_norm)
grads_and_var = list(zip(grads, var))
# zip aggregate each gradient with parameters associated
# For instance zip(ABCD, xyza) => Ax, By, Cz, Da
self.grads = grads
self.var = var
self._train_op = self.trainer.apply_gradients(grads_and_var)
self.grads = grads
self.var = var
self._train_op = self.trainer.apply_gradients(grads_and_var)
self.loss_names = ['policy_loss', 'value_loss', 'policy_entropy', 'approxkl', 'clipfrac']
self.stats_list = [pg_loss, vf_loss, entropy, approxkl, clipfrac]
self.loss_names = ['policy_loss', 'value_loss', 'entropy_loss', 'approxkl', 'clipfrac',
'total_loss']
self.stats_list = [pg_loss, vf_loss, entropy_loss, approxkl, clipfrac, loss]
self.train_model = train_model
self.act_model = act_model
self.initial_state = act_model.initial_state
self.train_model = train_model
self.act_model = act_model
self.step = act_model.step
self.value = act_model.value
self.initial_state = act_model.initial_state
self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess)
self.save = functools.partial(save_variables, sess=sess)
self.load = functools.partial(load_variables, sess=sess)
with tf.variable_scope('initialization'):
sess.run(tf.initializers.variables(tf.global_variables(self.scope.name)))
sess.run(tf.initializers.variables(tf.local_variables(self.scope.name)))
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=self.scope.name)
if MPI is not None:
sync_from_root(sess, global_variables) # pylint: disable=E1101
initialize()
global_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="")
if MPI is not None:
sync_from_root(sess, global_variables) #pylint: disable=E1101
def step_with_dict(self, **kwargs):
return self.act_model.step(**kwargs)
def train(self, lr, cliprange, obs, returns, masks, actions, values, neglogpacs, states=None):
# Here we calculate advantage A(s,a) = R + yV(s') - V(s)
# Returns = R + yV(s')
advs = returns - values
def step(self, obs, M=None, S=None, **kwargs):
kwargs.update({'observations': obs})
if M is not None and S is not None:
kwargs.update({'dones': M})
kwargs.update({'states': S})
transition = self.act_model.step(**kwargs)
states = transition['next_states'] if 'next_states' in transition else None
return transition['actions'], transition['values'], states, transition['neglogpacs']
def train(self,
lr,
cliprange,
observations,
advs,
returns,
actions,
values,
neglogpacs,
**_kwargs):
# Normalize the advantages
advs = (advs - advs.mean()) / (advs.std() + 1e-8)
td_map = {
self.train_model.X : obs,
self.A : actions,
self.ADV : advs,
self.R : returns,
self.LR : lr,
self.CLIPRANGE : cliprange,
self.OLDNEGLOGPAC : neglogpacs,
self.OLDVPRED : values
self.train_model.X: observations,
self.A: actions,
self.ADV: advs,
self.RETURNS: returns,
self.LR: lr,
self.CLIPRANGE: cliprange,
self.OLDNEGLOGPAC: neglogpacs,
self.VALUE_PREV: values,
}
if states is not None:
td_map[self.train_model.S] = states
td_map[self.train_model.M] = masks
td_map.update(self.train_model.feed_dict(**_kwargs))
return self.sess.run(
self.stats_list + [self._train_op],
td_map
)[:-1]

188
baselines/ppo2/policies.py Normal file
View File

@@ -0,0 +1,188 @@
import gym
import numpy as np
import tensorflow as tf
from baselines.a2c.utils import fc
from baselines.common import tf_util
from baselines.common.distributions import make_pdtype
from baselines.common.input import observation_placeholder, encode_observation
from baselines.common.models import get_network_builder
from baselines.common.tf_util import adjust_shape
from baselines.ppo2.layers import RNN
class PolicyWithValue(object):
"""
Encapsulates fields and methods for RL policy and two value function estimation with shared parameters
"""
def __init__(self, env, observations, latent, dones, states=None, estimate_q=False, vf_latent=None, sess=None):
"""
Parameters:
----------
env RL environment
observations tensorflow placeholder in which the observations will be fed
latent latent state from which policy distribution parameters should be inferred
vf_latent latent state from which value function should be inferred (if None, then latent is used)
sess tensorflow session to run calculations in (if None, default session is used)
**tensors tensorflow tensors for additional attributes such as state or mask
"""
self.X = observations
self.dones = dones
self.pdtype = make_pdtype(env.action_space)
self.states = states
self.sess = sess or tf.get_default_session()
vf_latent = vf_latent if vf_latent is not None else latent
with tf.variable_scope('policy'):
latent = tf.layers.flatten(latent)
# Based on the action space, will select what probability distribution type
self.pd, self.pi = self.pdtype.pdfromlatent(latent, init_scale=0.01)
with tf.variable_scope('sample_action'):
self.action = self.pd.sample()
with tf.variable_scope('negative_log_probability'):
# Calculate the neg log of our probability
self.neglogp = self.pd.neglogp(self.action)
with tf.variable_scope('value'):
vf_latent = tf.layers.flatten(vf_latent)
if estimate_q:
assert isinstance(env.action_space, gym.spaces.Discrete)
self.q = fc(vf_latent, 'q', env.action_space.n)
self.value = self.q
else:
vf_latent = tf.layers.flatten(vf_latent)
self.value = fc(vf_latent, 'value', 1, init_scale=0.01)
self.value = self.value[:, 0]
self.step_input = {
'observations': observations,
'dones': self.dones,
}
self.step_output = {
'actions': self.action,
'values': self.value,
'neglogpacs': self.neglogp,
}
if self.states:
self.initial_state = np.zeros(self.states['current'].get_shape())
self.step_input.update({'states': self.states['current']})
self.step_output.update({'states': self.states['current'],
'next_states': self.states['next']})
else:
self.initial_state = None
def feed_dict(self, **kwargs):
feed_dict = {}
for key in kwargs:
if key in self.step_input:
feed_dict[self.step_input[key]] = adjust_shape(self.step_input[key], kwargs[key])
return feed_dict
def step(self, **kwargs):
return self.sess.run(self.step_output,
feed_dict=self.feed_dict(**kwargs))
def values(self, **kwargs):
return self.sess.run({'values': self.value},
feed_dict=self.feed_dict(**kwargs))
def save(self, save_path):
tf_util.save_state(save_path, sess=self.sess)
def load(self, load_path):
tf_util.load_state(load_path, sess=self.sess)
def build_ppo_policy(env, policy_network, value_network=None, estimate_q=False, **policy_kwargs):
if isinstance(policy_network, str):
network_type = policy_network
policy_network = get_network_builder(network_type)(**policy_kwargs)
if value_network is None:
value_network = 'shared'
def policy_fn(nbatch=None, nsteps=None, sess=None, observ_placeholder=None):
next_states_list = []
state_map = {}
state_placeholder = None
ob_space = env.observation_space
X = observ_placeholder if observ_placeholder is not None else observation_placeholder(ob_space,
batch_size=nbatch)
dones = tf.placeholder(tf.float32, shape=[X.shape[0]], name='dones')
encoded_x = encode_observation(ob_space, X)
with tf.variable_scope('current_rnn_memory'):
if value_network == 'shared':
value_network_ = value_network
else:
if value_network == 'copy':
value_network_ = policy_network
else:
assert callable(value_network)
value_network_ = value_network
policy_memory_size = policy_network.memory_size if isinstance(policy_network, RNN) else 0
value_memory_size = value_network_.memory_size if isinstance(value_network_, RNN) else 0
state_size = policy_memory_size + value_memory_size
if state_size > 0:
state_placeholder = tf.placeholder(dtype=tf.float32, shape=(nbatch, state_size),
name='states')
state_map['policy'] = state_placeholder[:, 0:policy_memory_size]
state_map['value'] = state_placeholder[:, policy_memory_size:]
with tf.variable_scope('policy_latent', reuse=tf.AUTO_REUSE):
if isinstance(policy_network, RNN):
assert policy_memory_size > 0
policy_latent, next_policy_state = \
policy_network(encoded_x, dones, state_map['policy'])
next_states_list.append(next_policy_state)
else:
policy_latent = policy_network(encoded_x)
with tf.variable_scope('value_latent', reuse=tf.AUTO_REUSE):
if value_network_ == 'shared':
value_latent = policy_latent
elif isinstance(value_network_, RNN):
assert value_memory_size > 0
value_latent, next_value_state = \
value_network_(encoded_x, dones, state_map['value'])
next_states_list.append(next_value_state)
else:
value_latent = value_network_(encoded_x)
with tf.name_scope("next_rnn_memory"):
if state_size > 0:
next_states = tf.concat(next_states_list, axis=1)
state_info = {'current': state_placeholder,
'next': next_states, }
else:
state_info = None
policy = PolicyWithValue(
env=env,
observations=X,
dones=dones,
latent=policy_latent,
vf_latent=value_latent,
states=state_info,
sess=sess,
estimate_q=estimate_q,
)
return policy
return policy_fn

View File

@@ -1,28 +1,35 @@
import os
import time
import numpy as np
import os.path as osp
from baselines import logger
import time
from collections import deque
from baselines.common import explained_variance, set_global_seeds
from baselines.common.policies import build_policy
import numpy as np
import tensorflow as tf
from baselines import logger
from baselines.common import explained_variance
from baselines.common import set_global_seeds
from baselines.common.tf_util import display_var_info
from baselines.ppo2.policies import build_ppo_policy
from baselines.ppo2.runner import Runner
try:
from mpi4py import MPI
except ImportError:
MPI = None
from baselines.ppo2.runner import Runner
def constfn(val):
def f(_):
return val
return f
def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2048, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=0, load_path=None, model_fn=None, **network_kwargs):
'''
def learn(*, network, env, total_timesteps, eval_env=None, seed=None, nsteps=128, ent_coef=0.0, lr=3e-4,
vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95,
log_interval=10, nminibatches=4, noptepochs=4, cliprange=0.2,
save_interval=10, load_path=None, model_fn=None, **network_kwargs):
"""
Learn policy using PPO algorithm (https://arxiv.org/abs/1707.06347)
Parameters:
@@ -52,7 +59,7 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
max_grad_norm: float or None gradient norm clipping coefficient
gamma: float discounting factor
gamma: float discounting factor for rewards
lam: float advantage estimation discounting factor (lambda in the paper)
@@ -72,20 +79,21 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
**network_kwargs: keyword arguments to the policy / network builder. See baselines.common/policies.py/build_policy and arguments to a particular type of network
For instance, 'mlp' network architecture has arguments num_hidden and num_layers.
'''
"""
set_global_seeds(seed)
if isinstance(lr, float): lr = constfn(lr)
else: assert callable(lr)
if isinstance(cliprange, float): cliprange = constfn(cliprange)
else: assert callable(cliprange)
if isinstance(lr, float):
lr = constfn(lr)
else:
assert callable(lr)
if isinstance(cliprange, float):
cliprange = constfn(cliprange)
else:
assert callable(cliprange)
total_timesteps = int(total_timesteps)
policy = build_policy(env, network, **network_kwargs)
policy = build_ppo_policy(env, network, **network_kwargs)
# Get the nb of env
nenvs = env.num_envs
@@ -104,108 +112,111 @@ def learn(*, network, env, total_timesteps, eval_env = None, seed=None, nsteps=2
model_fn = Model
model = model_fn(policy=policy, ob_space=ob_space, ac_space=ac_space, nbatch_act=nenvs, nbatch_train=nbatch_train,
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef,
max_grad_norm=max_grad_norm)
nsteps=nsteps, ent_coef=ent_coef, vf_coef=vf_coef, max_grad_norm=max_grad_norm)
if load_path is not None:
model.load(load_path)
allvars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=model.name)
display_var_info(allvars)
# Instantiate the runner object
runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, lam=lam)
runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, ob_space=ob_space, lam=lam)
if eval_env is not None:
eval_runner = Runner(env = eval_env, model = model, nsteps = nsteps, gamma = gamma, lam= lam)
eval_runner = Runner(env=eval_env, model=model, nsteps=nsteps, gamma=gamma, ob_space=ob_space, lam=lam)
epinfobuf = deque(maxlen=100)
if eval_env is not None:
eval_epinfobuf = deque(maxlen=100)
# Start total timer
tfirststart = time.time()
tfirststart = time.perf_counter()
nupdates = total_timesteps // nbatch
nupdates = total_timesteps//nbatch
for update in range(1, nupdates+1):
for update in range(1, nupdates + 1):
assert nbatch % nminibatches == 0
# Start timer
tstart = time.time()
tstart = time.perf_counter()
frac = 1.0 - (update - 1.0) / nupdates
# Calculate the learning rate
lrnow = lr(frac)
# Calculate the cliprange
cliprangenow = cliprange(frac)
# Get minibatch
obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632
if eval_env is not None:
eval_obs, eval_returns, eval_masks, eval_actions, eval_values, eval_neglogpacs, eval_states, eval_epinfos = eval_runner.run() #pylint: disable=E0632
epinfobuf.extend(epinfos)
# Get minibatch
minibatch = runner.run()
if eval_env is not None:
eval_minibatch = eval_runner.run()
_eval_obs = eval_minibatch['observations'] # noqa: F841
_eval_returns = eval_minibatch['returns'] # noqa: F841
_eval_masks = eval_minibatch['masks'] # noqa: F841
_eval_actions = eval_minibatch['actions'] # noqa: F841
_eval_values = eval_minibatch['values'] # noqa: F841
_eval_neglogpacs = eval_minibatch['neglogpacs'] # noqa: F841
_eval_states = eval_minibatch['state'] # noqa: F841
eval_epinfos = eval_minibatch['epinfos']
epinfobuf.extend(minibatch.pop('epinfos'))
if eval_env is not None:
eval_epinfobuf.extend(eval_epinfos)
# Here what we're going to do is for each minibatch calculate the loss and append it.
mblossvals = []
if states is None: # nonrecurrent version
# Index of each element of batch_size
# Create the indices array
inds = np.arange(nbatch)
for _ in range(noptepochs):
# Randomize the indexes
np.random.shuffle(inds)
# 0 to batch_size with batch_train_size step
for start in range(0, nbatch, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
slices = (arr[mbinds] for arr in (obs, returns, masks, actions, values, neglogpacs))
mblossvals.append(model.train(lrnow, cliprangenow, *slices))
else: # recurrent version
assert nenvs % nminibatches == 0
envsperbatch = nenvs // nminibatches
envinds = np.arange(nenvs)
flatinds = np.arange(nenvs * nsteps).reshape(nenvs, nsteps)
envsperbatch = nbatch_train // nsteps
for _ in range(noptepochs):
np.random.shuffle(envinds)
for start in range(0, nenvs, envsperbatch):
end = start + envsperbatch
mbenvinds = envinds[start:end]
mbflatinds = flatinds[mbenvinds].ravel()
slices = (arr[mbflatinds] for arr in (obs, returns, masks, actions, values, neglogpacs))
mbstates = states[mbenvinds]
mblossvals.append(model.train(lrnow, cliprangenow, *slices, mbstates))
# Index of each element of batch_size
# Create the indices array
inds = np.arange(nbatch)
for _ in range(noptepochs):
# Randomize the indexes
np.random.shuffle(inds)
# 0 to batch_size with batch_train_size step
for start in range(0, nbatch, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
slices = {key: minibatch[key][mbinds] for key in minibatch}
mblossvals.append(model.train(lrnow, cliprangenow, **slices))
# Feedforward --> get losses --> update
lossvals = np.mean(mblossvals, axis=0)
# End timer
tnow = time.time()
tnow = time.perf_counter()
# Calculate the fps (frame per second)
fps = int(nbatch / (tnow - tstart))
if update % log_interval == 0 or update == 1:
# Calculates if value function is a good predicator of the returns (ev > 1)
# or if it's just worse than predicting nothing (ev =< 0)
ev = explained_variance(values, returns)
logger.logkv("serial_timesteps", update*nsteps)
ev = explained_variance(minibatch['values'], minibatch['returns'])
logger.logkv("serial_timesteps", update * nsteps)
logger.logkv("nupdates", update)
logger.logkv("total_timesteps", update*nbatch)
logger.logkv("total_timesteps", update * nbatch)
logger.logkv("fps", fps)
logger.logkv("explained_variance", float(ev))
logger.logkv('eprewmean', safemean([epinfo['r'] for epinfo in epinfobuf]))
logger.logkv('eplenmean', safemean([epinfo['l'] for epinfo in epinfobuf]))
logger.logkv('rewards_per_step', safemean(minibatch['rewards']))
logger.logkv('advantages_per_step', safemean(minibatch['advs']))
if eval_env is not None:
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]) )
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]) )
logger.logkv('eval_eprewmean', safemean([epinfo['r'] for epinfo in eval_epinfobuf]))
logger.logkv('eval_eplenmean', safemean([epinfo['l'] for epinfo in eval_epinfobuf]))
logger.logkv('time_elapsed', tnow - tfirststart)
for (lossval, lossname) in zip(lossvals, model.loss_names):
logger.logkv(lossname, lossval)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
logger.dumpkvs()
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
if save_interval and (update % save_interval == 0 or update == 1) and logger.get_dir() and (
MPI is None or MPI.COMM_WORLD.Get_rank() == 0):
checkdir = osp.join(logger.get_dir(), 'checkpoints')
os.makedirs(checkdir, exist_ok=True)
savepath = osp.join(checkdir, '%.5i'%update)
savepath = osp.join(checkdir, '%.5i' % update)
print('Saving to', savepath)
model.save(savepath)
del minibatch
return model
# Avoid division error when calculate the mean (in our case if epinfo is empty returns np.nan, not return an error)
def safemean(xs):
return np.nan if len(xs) == 0 else np.mean(xs)

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

View File

@@ -1,6 +1,8 @@
import numpy as np
from baselines.common.runners import AbstractEnvRunner
class Runner(AbstractEnvRunner):
"""
We use this object to make a mini batch of experiences
@@ -10,67 +12,118 @@ class Runner(AbstractEnvRunner):
run():
- Make a mini batch
"""
def __init__(self, *, env, model, nsteps, gamma, lam):
def __init__(self, *, env, model, nsteps, gamma, ob_space, lam):
super().__init__(env=env, model=model, nsteps=nsteps)
# Lambda used in GAE (General Advantage Estimation)
self.lam = lam
# Discount rate
self.gamma = gamma
self.lam = lam # Lambda used in GAE (General Advantage Estimation)
self.gamma = gamma # Discount rate for rewards
self.ob_space = ob_space
def run(self):
# Here, we init the lists that will contain the mb of experiences
mb_obs, mb_rewards, mb_actions, mb_values, mb_dones, mb_neglogpacs = [],[],[],[],[],[]
mb_states = self.states
minibatch = {
"observations": [],
"actions": [],
"rewards": [],
"values": [],
"dones": [],
"neglogpacs": [],
}
data_type = {
"observations": self.obs.dtype,
"actions": np.float32,
"rewards": np.float32,
"values": np.float32,
"dones": np.float32,
"neglogpacs": np.float32,
}
prev_transition = {'next_states': self.model.initial_state} if self.model.initial_state is not None else {}
epinfos = []
# For n in range number of steps
for _ in range(self.nsteps):
# Given observations, get action value and neglopacs
# We already have self.obs because Runner superclass run self.obs[:] = env.reset() on init
actions, values, self.states, neglogpacs = self.model.step(self.obs, S=self.states, M=self.dones)
mb_obs.append(self.obs.copy())
mb_actions.append(actions)
mb_values.append(values)
mb_neglogpacs.append(neglogpacs)
mb_dones.append(self.dones)
transitions = {}
transitions['observations'] = self.obs.copy()
transitions['dones'] = self.dones
if 'next_states' in prev_transition:
transitions['states'] = prev_transition['next_states']
transitions.update(self.model.step_with_dict(**transitions))
# Take actions in env and look the results
# Infos contains a ton of useful informations
self.obs[:], rewards, self.dones, infos = self.env.step(actions)
self.obs, transitions['rewards'], self.dones, infos = self.env.step(transitions['actions'])
self.dones = np.array(self.dones, dtype=np.float)
for info in infos:
maybeepinfo = info.get('episode')
if maybeepinfo: epinfos.append(maybeepinfo)
mb_rewards.append(rewards)
#batch of steps to batch of rollouts
mb_obs = np.asarray(mb_obs, dtype=self.obs.dtype)
mb_rewards = np.asarray(mb_rewards, dtype=np.float32)
mb_actions = np.asarray(mb_actions)
mb_values = np.asarray(mb_values, dtype=np.float32)
mb_neglogpacs = np.asarray(mb_neglogpacs, dtype=np.float32)
mb_dones = np.asarray(mb_dones, dtype=np.bool)
last_values = self.model.value(self.obs, S=self.states, M=self.dones)
if maybeepinfo:
epinfos.append(maybeepinfo)
# discount/bootstrap off value fn
mb_returns = np.zeros_like(mb_rewards)
mb_advs = np.zeros_like(mb_rewards)
lastgaelam = 0
for key in transitions:
if key not in minibatch:
minibatch[key] = []
minibatch[key].append(transitions[key])
prev_transition = transitions
for key in minibatch:
dtype = data_type[key] if key in data_type else np.float
minibatch[key] = np.array(minibatch[key], dtype=dtype)
transitions['observations'] = self.obs.copy()
transitions['dones'] = self.dones
if 'states' in transitions:
transitions['states'] = transitions.pop('next_states')
for key in minibatch:
dtype = data_type[key] if key in data_type else np.float
minibatch[key] = np.asarray(minibatch[key], dtype=dtype)
last_values = self.model.step_with_dict(**transitions)['values']
# Calculate returns and advantages.
minibatch['advs'], minibatch['returns'] = \
self.advantage_and_returns(values=minibatch['values'],
rewards=minibatch['rewards'],
dones=minibatch['dones'],
last_values=last_values,
last_dones=self.dones,
gamma=self.gamma)
for key in minibatch:
minibatch[key] = sf01(minibatch[key])
minibatch['epinfos'] = epinfos
return minibatch
def advantage_and_returns(self, values, rewards, dones, last_values, last_dones, gamma,
use_non_episodic_rewards=False):
"""
calculate Generalized Advantage Estimation (GAE), https://arxiv.org/abs/1506.02438
see also Proximal Policy Optimization Algorithms, https://arxiv.org/abs/1707.06347
"""
advantages = np.zeros_like(rewards)
lastgaelam = 0 # Lambda used in General Advantage Estimation
for t in reversed(range(self.nsteps)):
if t == self.nsteps - 1:
nextnonterminal = 1.0 - self.dones
nextvalues = last_values
if not use_non_episodic_rewards:
if t == self.nsteps - 1:
next_non_terminal = 1.0 - last_dones
else:
next_non_terminal = 1.0 - dones[t + 1]
else:
nextnonterminal = 1.0 - mb_dones[t+1]
nextvalues = mb_values[t+1]
delta = mb_rewards[t] + self.gamma * nextvalues * nextnonterminal - mb_values[t]
mb_advs[t] = lastgaelam = delta + self.gamma * self.lam * nextnonterminal * lastgaelam
mb_returns = mb_advs + mb_values
return (*map(sf01, (mb_obs, mb_returns, mb_dones, mb_actions, mb_values, mb_neglogpacs)),
mb_states, epinfos)
# obs, returns, masks, actions, values, neglogpacs, states = runner.run()
next_non_terminal = 1.0
next_value = values[t + 1] if t < self.nsteps - 1 else last_values
delta = rewards[t] + gamma * next_value * next_non_terminal - values[t]
advantages[t] = lastgaelam = delta + gamma * self.lam * next_non_terminal * lastgaelam
returns = advantages + values
return advantages, returns
def sf01(arr):
"""
swap and then flatten axes 0 and 1
"""
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])

View File

@@ -25,10 +25,11 @@ def test_microbatches():
env_test = DummyVecEnv([env_fn])
sess_test = make_session(make_default=True, graph=tf.Graph())
learn_fn(env=env_test, model_fn=partial(MicrobatchedModel, microbatch_size=2))
# learn_fn(env=env_test)
vars_test = {v.name: sess_test.run(v) for v in tf.trainable_variables()}
for v in vars_ref:
np.testing.assert_allclose(vars_ref[v], vars_test[v], atol=1e-3)
np.testing.assert_allclose(vars_ref[v], vars_test[v], atol=3e-3)
if __name__ == '__main__':
test_microbatches()

View File

@@ -1,4 +1,5 @@
import sys
import re
import multiprocessing
import os.path as osp
import gym
@@ -6,15 +7,13 @@ from collections import defaultdict
import tensorflow as tf
import numpy as np
from baselines.common.vec_env import VecFrameStack, VecNormalize, VecEnv
from baselines.common.vec_env.vec_video_recorder import VecVideoRecorder
from baselines.common.vec_env.vec_frame_stack import VecFrameStack
from baselines.common.cmd_util import common_arg_parser, parse_unknown_args, make_vec_env, make_env
from baselines.common.tf_util import get_session
from baselines import logger
from importlib import import_module
from baselines.common.vec_env.vec_normalize import VecNormalize
try:
from mpi4py import MPI
except ImportError:
@@ -52,7 +51,7 @@ _game_envs['retro'] = {
def train(args, extra_args):
env_type, env_id = get_env_type(args.env)
env_type, env_id = get_env_type(args)
print('env_type: {}'.format(env_type))
total_timesteps = int(args.num_timesteps)
@@ -64,7 +63,7 @@ def train(args, extra_args):
env = build_env(args)
if args.save_video_interval != 0:
env = VecVideoRecorder(env, osp.join(logger.Logger.CURRENT.dir, "videos"), record_video_trigger=lambda x: x % args.save_video_interval == 0, video_length=args.save_video_length)
env = VecVideoRecorder(env, osp.join(logger.get_dir(), "videos"), record_video_trigger=lambda x: x % args.save_video_interval == 0, video_length=args.save_video_length)
if args.network:
alg_kwargs['network'] = args.network
@@ -91,7 +90,7 @@ def build_env(args):
alg = args.alg
seed = args.seed
env_type, env_id = get_env_type(args.env)
env_type, env_id = get_env_type(args)
if env_type in {'atari', 'retro'}:
if alg == 'deepq':
@@ -104,22 +103,27 @@ def build_env(args):
env = VecFrameStack(env, frame_stack_size)
else:
config = tf.ConfigProto(allow_soft_placement=True,
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config)
config.gpu_options.allow_growth = True
get_session(config=config)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
if env_type == 'mujoco':
env = VecNormalize(env)
if env_type == 'mujoco':
env = VecNormalize(env)
return env
def get_env_type(env_id):
def get_env_type(args):
env_id = args.env
if args.env_type is not None:
return args.env_type, env_id
# Re-parse the gym registry, since we could have new envs since last time.
for env in gym.envs.registry.all():
env_type = env._entry_point.split(':')[0].split('.')[-1]
@@ -134,6 +138,8 @@ def get_env_type(env_id):
if env_id in e:
env_type = g
break
if ':' in env_id:
env_type = re.sub(r':.*', '', env_id)
assert env_type is not None, 'env_id {} is not recognized in env types'.format(env_id, _game_envs.keys())
return env_type, env_id
@@ -194,9 +200,6 @@ def main(args):
args, unknown_args = arg_parser.parse_known_args(args)
extra_args = parse_cmdline_kwargs(unknown_args)
if args.extra_import is not None:
import_module(args.extra_import)
if MPI is None or MPI.COMM_WORLD.Get_rank() == 0:
rank = 0
logger.configure()
@@ -205,7 +208,6 @@ def main(args):
rank = MPI.COMM_WORLD.Get_rank()
model, env = train(args, extra_args)
env.close()
if args.save_path is not None and rank == 0:
save_path = osp.expanduser(args.save_path)
@@ -213,26 +215,28 @@ def main(args):
if args.play:
logger.log("Running trained model")
env = build_env(args)
obs = env.reset()
state = model.initial_state if hasattr(model, 'initial_state') else None
dones = np.zeros((1,))
episode_rew = 0
while True:
if state is not None:
actions, _, state, _ = model.step(obs,S=state, M=dones)
else:
actions, _, _, _ = model.step(obs)
obs, _, done, _ = env.step(actions)
obs, rew, done, _ = env.step(actions)
episode_rew += rew[0] if isinstance(env, VecEnv) else rew
env.render()
done = done.any() if isinstance(done, np.ndarray) else done
if done:
print('episode_rew={}'.format(episode_rew))
episode_rew = 0
obs = env.reset()
env.close()
env.close()
return model

View File

@@ -120,7 +120,7 @@
<td>114.26</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -152,7 +152,7 @@
<td>131.46</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -184,7 +184,7 @@
<td>113.58</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -216,7 +216,7 @@
<td>82.94</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -248,7 +248,7 @@
<td>81.61</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -280,7 +280,7 @@
<td>59.72</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>
@@ -312,7 +312,7 @@
<td>14.98</td>
<td>cbd21ef</td>
<td><a href=https://github.com/openai/baselines/commit/7bfbcf177eca8f46c0c0bfbb378e044539f5e061>7bfbcf1</a></td>
</tr>