2016-04-27 08:00:58 -07:00
import atexit
import logging
import json
import numpy as np
import os
import six
import sys
import threading
import weakref
from gym import error , version
from gym . monitoring import stats_recorder , video_recorder
2016-05-29 09:07:09 -07:00
from gym . utils import atomic_write , closer , seeding
2016-04-27 08:00:58 -07:00
logger = logging . getLogger ( __name__ )
FILE_PREFIX = ' openaigym '
MANIFEST_PREFIX = FILE_PREFIX + ' .manifest '
def detect_training_manifests ( training_dir ) :
return [ os . path . join ( training_dir , f ) for f in os . listdir ( training_dir ) if f . startswith ( MANIFEST_PREFIX + ' . ' ) ]
def detect_monitor_files ( training_dir ) :
return [ os . path . join ( training_dir , f ) for f in os . listdir ( training_dir ) if f . startswith ( FILE_PREFIX + ' . ' ) ]
def clear_monitor_files ( training_dir ) :
files = detect_monitor_files ( training_dir )
if len ( files ) == 0 :
return
logger . info ( ' Clearing %d monitor files from previous run (because force=True was provided) ' , len ( files ) )
for file in files :
os . unlink ( file )
def capped_cubic_video_schedule ( episode_id ) :
if episode_id < 1000 :
return int ( round ( episode_id * * ( 1. / 3 ) ) ) * * 3 == episode_id
else :
return episode_id % 1000 == 0
2016-05-29 13:56:38 -07:00
def disable_videos ( episode_id ) :
return False
2016-05-27 12:16:35 -07:00
monitor_closer = closer . Closer ( )
# This method gets used for a sanity check in scoreboard/api.py. It's
# not intended for use outside of the gym codebase.
2016-05-18 01:27:58 -07:00
def _open_monitors ( ) :
2016-05-27 20:00:29 -07:00
return list ( monitor_closer . closeables . values ( ) )
2016-04-27 08:00:58 -07:00
class Monitor ( object ) :
""" A configurable monitor for your training runs.
Every env has an attached monitor , which you can access as
' env.monitor ' . Simple usage is just to call ' monitor.start(dir) '
to begin monitoring and ' monitor.close() ' when training is
complete . This will record stats and will periodically record a video .
For finer - grained control over how often videos are collected , use the
video_callable argument , e . g .
' monitor.start(video_callable=lambda count: count % 100 == 0) '
to record every 100 episodes . ( ' count ' is how many episodes have completed )
Depending on the environment , video can slow down execution . You
2016-04-28 14:01:24 +02:00
can also use ' monitor.configure(video_callable=lambda count: False) ' to disable
2016-04-27 08:00:58 -07:00
video .
Monitor supports multiple threads and multiple processes writing
to the same directory of training data . The data will later be
joined by scoreboard . upload_training_data and on the server .
Args :
env ( gym . Env ) : The environment instance to monitor .
Attributes :
id ( Optional [ str ] ) : The ID of the monitored environment
"""
def __init__ ( self , env ) :
2016-08-06 14:53:52 -07:00
# Python's GC allows refcycles *or* for objects to have a
# __del__ method. So we need to maintain a weakref to env.
#
# https://docs.python.org/2/library/gc.html#gc.garbage
self . _env_ref = weakref . ref ( env )
2016-04-27 08:00:58 -07:00
self . videos = [ ]
self . stats_recorder = None
self . video_recorder = None
self . enabled = False
self . episode_id = 0
2016-05-16 23:45:56 -07:00
self . _monitor_id = None
2016-05-29 09:07:09 -07:00
self . seeds = None
2016-04-27 08:00:58 -07:00
2016-08-06 14:53:52 -07:00
@property
def env ( self ) :
env = self . _env_ref ( )
if env is None :
raise error . Error ( " env has been garbage collected. To keep using a monitor, you must keep around a reference to the env object. (HINT: try assigning the env to a variable in your code.) " )
return env
2016-08-17 11:25:09 -07:00
def start ( self , directory , video_callable = None , force = False , resume = False , seed = None , write_upon_reset = False ) :
2016-04-27 08:00:58 -07:00
""" Start monitoring.
Args :
directory ( str ) : A per - training run directory where to record stats .
2016-05-29 13:56:38 -07:00
video_callable ( Optional [ function , False ] ) : function that takes in the index of the episode and outputs a boolean , indicating whether we should record a video on this episode . The default ( for video_callable is None ) is to take perfect cubes , capped at 1000. False disables video recording .
2016-04-27 08:00:58 -07:00
force ( bool ) : Clear out existing training data from this directory ( by deleting every file prefixed with " openaigym. " ) .
2016-05-10 17:25:15 -07:00
resume ( bool ) : Retain the training data already in this directory , which will be merged with our new data
2016-05-29 09:07:09 -07:00
seed ( Optional [ int ] ) : The seed to run this environment with . By default , a random seed will be chosen .
2016-08-17 11:25:09 -07:00
write_upon_reset ( bool ) : Write the manifest file on each reset . ( This is currently a JSON file , so writing it is somewhat expensive . )
2016-04-27 08:00:58 -07:00
"""
if self . env . spec is None :
logger . warn ( " Trying to monitor an environment which has no ' spec ' set. This usually means you did not create it via ' gym.make ' , and is recommended only for advanced users. " )
if not os . path . exists ( directory ) :
logger . info ( ' Creating monitor directory %s ' , directory )
os . makedirs ( directory )
if video_callable is None :
video_callable = capped_cubic_video_schedule
2016-05-29 13:56:38 -07:00
elif video_callable == False :
video_callable = disable_videos
2016-05-06 18:19:16 -07:00
elif not callable ( video_callable ) :
raise error . Error ( ' You must provide a function, None, or False for video_callable, not {} : {} ' . format ( type ( video_callable ) , video_callable ) )
2016-04-27 08:00:58 -07:00
# Check on whether we need to clear anything
if force :
clear_monitor_files ( directory )
2016-05-10 17:25:15 -07:00
elif not resume :
2016-04-27 08:00:58 -07:00
training_manifests = detect_training_manifests ( directory )
if len ( training_manifests ) > 0 :
raise error . Error ( ''' Trying to write to monitor directory {} with existing monitor files: {} .
You should use a unique directory for each training run , or use ' force=True ' to automatically clear previous monitor files . ''' .format(directory, ' , ' .join(training_manifests[:5])))
2016-05-29 09:07:09 -07:00
self . _monitor_id = monitor_closer . register ( self )
2016-04-27 08:00:58 -07:00
self . enabled = True
self . directory = os . path . abspath ( directory )
# We use the 'openai-gym' prefix to determine if a file is
# ours
self . file_prefix = FILE_PREFIX
2016-05-16 23:45:56 -07:00
self . file_infix = ' {} . {} ' . format ( self . _monitor_id , os . getpid ( ) )
2016-04-27 08:00:58 -07:00
self . stats_recorder = stats_recorder . StatsRecorder ( directory , ' {} .episode_batch. {} ' . format ( self . file_prefix , self . file_infix ) )
self . configure ( video_callable = video_callable )
if not os . path . exists ( directory ) :
os . mkdir ( directory )
2016-08-17 11:25:09 -07:00
self . write_upon_reset = write_upon_reset
2016-05-29 09:07:09 -07:00
seeds = self . env . seed ( seed )
self . seeds = seeds
2016-04-27 08:00:58 -07:00
2016-08-17 11:25:09 -07:00
def flush ( self , force = False ) :
2016-05-06 18:19:16 -07:00
""" Flush all relevant monitor information to disk. """
2016-05-06 22:00:29 -07:00
self . stats_recorder . flush ( )
2016-05-07 10:38:42 +10:00
2016-08-17 11:25:09 -07:00
if not self . write_upon_reset and not force :
return
2016-05-06 18:19:16 -07:00
# Give it a very distiguished name, since we need to pick it
# up from the filesystem later.
2016-05-06 22:00:29 -07:00
path = os . path . join ( self . directory , ' {} .manifest. {} .manifest.json ' . format ( self . file_prefix , self . file_infix ) )
2016-05-06 18:19:16 -07:00
logger . debug ( ' Writing training manifest file to %s ' , path )
with atomic_write . atomic_write ( path ) as f :
# We need to write relative paths here since people may
# move the training_dir around. It would be cleaner to
# already have the basenames rather than basename'ing
# manually, but this works for now.
json . dump ( {
2016-05-06 22:00:29 -07:00
' stats ' : os . path . basename ( self . stats_recorder . path ) ,
2016-05-06 18:19:16 -07:00
' videos ' : [ ( os . path . basename ( v ) , os . path . basename ( m ) )
for v , m in self . videos ] ,
' env_info ' : self . _env_info ( ) ,
2016-05-29 09:07:09 -07:00
' seeds ' : self . seeds ,
2016-05-06 18:19:16 -07:00
} , f )
2016-05-07 10:38:42 +10:00
2016-05-06 18:19:16 -07:00
def close ( self ) :
""" Flush all monitor data to disk and close any open rending windows. """
if not self . enabled :
return
2016-05-06 22:00:29 -07:00
self . stats_recorder . close ( )
2016-04-27 08:00:58 -07:00
if self . video_recorder is not None :
self . _close_video_recorder ( )
2016-08-17 11:25:09 -07:00
self . flush ( force = True )
2016-05-07 10:38:42 +10:00
2016-08-06 14:53:52 -07:00
env = self . _env_ref ( )
# Only take action if the env hasn't been GC'd
if env is not None :
# Note we'll close the env's rendering window even if we did
# not open it. There isn't a particular great way to know if
# we did, since some environments will have a window pop up
# during video recording.
try :
env . render ( close = True )
except Exception as e :
if env . spec :
key = env . spec . id
else :
key = env
# We don't want to avoid writing the manifest simply
# because we couldn't close the renderer.
logger . error ( ' Could not close renderer for %s : %s ' , key , e )
# Remove the env's pointer to this monitor
del env . _monitor
2016-05-07 10:38:42 +10:00
# Stop tracking this for autoclose
2016-05-18 01:27:58 -07:00
monitor_closer . unregister ( self . _monitor_id )
2016-05-16 23:45:56 -07:00
self . enabled = False
2016-05-07 10:38:42 +10:00
logger . info ( ''' Finished writing results. You can upload them to the scoreboard via gym.upload( %r ) ''' , self . directory )
2016-04-27 08:00:58 -07:00
def configure ( self , video_callable = None ) :
""" Reconfigure the monitor.
video_callable ( function ) : Whether to record video to upload to the scoreboard .
"""
2016-05-06 18:19:16 -07:00
2016-04-27 08:00:58 -07:00
if video_callable is not None :
self . video_callable = video_callable
def _before_step ( self , action ) :
if not self . enabled : return
self . stats_recorder . before_step ( action )
def _after_step ( self , observation , reward , done , info ) :
if not self . enabled : return done
# Add 1 since about to take another step
if self . env . spec and self . stats_recorder . steps + 1 > = self . env . spec . timestep_limit :
logger . info ( ' Ending episode %i because it reached the timestep limit of %i . ' , self . episode_id , self . env . spec . timestep_limit )
done = True
# Record stats
self . stats_recorder . after_step ( observation , reward , done , info )
# Record video
self . video_recorder . capture_frame ( )
return done
def _before_reset ( self ) :
if not self . enabled : return
self . stats_recorder . before_reset ( )
def _after_reset ( self , observation ) :
if not self . enabled : return
2016-05-06 18:19:16 -07:00
2016-04-27 08:00:58 -07:00
# Reset the stat count
self . stats_recorder . after_reset ( observation )
# Close any existing video recorder
if self . video_recorder :
self . _close_video_recorder ( )
# Start recording the next video.
2016-05-06 18:23:08 -07:00
#
# TODO: calculate a more correct 'episode_id' upon merge
2016-04-27 08:00:58 -07:00
self . video_recorder = video_recorder . VideoRecorder (
env = self . env ,
2016-05-06 22:00:29 -07:00
base_path = os . path . join ( self . directory , ' {} .video. {} .video {:06} ' . format ( self . file_prefix , self . file_infix , self . episode_id ) ) ,
2016-05-06 18:19:16 -07:00
metadata = { ' episode_id ' : self . episode_id } ,
2016-04-27 08:00:58 -07:00
enabled = self . _video_enabled ( ) ,
)
self . video_recorder . capture_frame ( )
# Bump *after* all reset activity has finished
self . episode_id + = 1
2016-05-06 18:19:16 -07:00
self . flush ( )
2016-04-27 08:00:58 -07:00
def _close_video_recorder ( self ) :
self . video_recorder . close ( )
if self . video_recorder . functional :
self . videos . append ( ( self . video_recorder . path , self . video_recorder . metadata_path ) )
def _video_enabled ( self ) :
return self . video_callable ( self . episode_id )
def _env_info ( self ) :
2016-05-29 09:07:09 -07:00
env_info = {
' gym_version ' : version . VERSION ,
}
2016-04-27 08:00:58 -07:00
if self . env . spec :
2016-05-29 09:07:09 -07:00
env_info [ ' env_id ' ] = self . env . spec . id
return env_info
2016-04-27 08:00:58 -07:00
def __del__ ( self ) :
# Make sure we've closed up shop when garbage collecting
self . close ( )
def load_results ( training_dir ) :
if not os . path . exists ( training_dir ) :
return
manifests = detect_training_manifests ( training_dir )
if not manifests :
return
logger . debug ( ' Uploading data from manifest %s ' , ' , ' . join ( manifests ) )
# Load up stats + video files
stats_files = [ ]
videos = [ ]
2016-05-29 09:07:09 -07:00
main_seeds = [ ]
seeds = [ ]
2016-04-27 08:00:58 -07:00
env_infos = [ ]
for manifest in manifests :
with open ( manifest ) as f :
contents = json . load ( f )
# Make these paths absolute again
stats_files . append ( os . path . join ( training_dir , contents [ ' stats ' ] ) )
videos + = [ ( os . path . join ( training_dir , v ) , os . path . join ( training_dir , m ) )
for v , m in contents [ ' videos ' ] ]
env_infos . append ( contents [ ' env_info ' ] )
2016-05-29 09:07:09 -07:00
current_seeds = contents . get ( ' seeds ' , [ ] )
seeds + = current_seeds
if current_seeds :
main_seeds . append ( current_seeds [ 0 ] )
else :
# current_seeds could be None or []
main_seeds . append ( None )
2016-04-27 08:00:58 -07:00
env_info = collapse_env_infos ( env_infos , training_dir )
2016-04-27 09:17:05 -07:00
timestamps , episode_lengths , episode_rewards , initial_reset_timestamp = merge_stats_files ( stats_files )
2016-04-27 08:00:58 -07:00
return {
' manifests ' : manifests ,
' env_info ' : env_info ,
' timestamps ' : timestamps ,
' episode_lengths ' : episode_lengths ,
' episode_rewards ' : episode_rewards ,
2016-04-27 09:17:05 -07:00
' initial_reset_timestamp ' : initial_reset_timestamp ,
2016-04-27 08:00:58 -07:00
' videos ' : videos ,
2016-05-29 09:07:09 -07:00
' main_seeds ' : main_seeds ,
' seeds ' : seeds ,
2016-04-27 08:00:58 -07:00
}
def merge_stats_files ( stats_files ) :
timestamps = [ ]
episode_lengths = [ ]
episode_rewards = [ ]
2016-04-27 09:17:05 -07:00
initial_reset_timestamps = [ ]
2016-04-27 08:00:58 -07:00
for path in stats_files :
with open ( path ) as f :
content = json . load ( f )
timestamps + = content [ ' timestamps ' ]
episode_lengths + = content [ ' episode_lengths ' ]
episode_rewards + = content [ ' episode_rewards ' ]
2016-04-27 09:17:05 -07:00
initial_reset_timestamps . append ( content [ ' initial_reset_timestamp ' ] )
2016-04-27 08:00:58 -07:00
idxs = np . argsort ( timestamps )
timestamps = np . array ( timestamps ) [ idxs ] . tolist ( )
episode_lengths = np . array ( episode_lengths ) [ idxs ] . tolist ( )
episode_rewards = np . array ( episode_rewards ) [ idxs ] . tolist ( )
2016-04-27 09:17:05 -07:00
initial_reset_timestamp = min ( initial_reset_timestamps )
return timestamps , episode_lengths , episode_rewards , initial_reset_timestamp
2016-04-27 08:00:58 -07:00
def collapse_env_infos ( env_infos , training_dir ) :
assert len ( env_infos ) > 0
first = env_infos [ 0 ]
for other in env_infos [ 1 : ] :
if first != other :
raise error . Error ( ' Found two unequal env_infos: {} and {} . This usually indicates that your training directory {} has commingled results from multiple runs. ' . format ( first , other , training_dir ) )
for key in [ ' env_id ' , ' gym_version ' ] :
if key not in first :
raise error . Error ( " env_info {} from training directory {} is missing expected key {} . This is unexpected and likely indicates a bug in gym. " . format ( first , training_dir , key ) )
return first