Files
Gymnasium/gym/envs/doom/doom_env.py

416 lines
17 KiB
Python

import logging
import os
from time import sleep
import numpy as np
import gym
from gym import utils, spaces
from gym.utils import seeding
try:
import doom_py
from doom_py import DoomGame, Mode, Button, GameVariable, ScreenFormat, ScreenResolution, Loader
except ImportError as e:
raise gym.error.DependencyNotInstalled("{}. (HINT: you can install Doom dependencies " +
"with 'pip install gym[doom].)'".format(e))
logger = logging.getLogger(__name__)
# Constants
NUM_ACTIONS = 43
NUM_LEVELS = 9
CONFIG = 0
SCENARIO = 1
MAP = 2
DIFFICULTY = 3
ACTIONS = 4
MIN_SCORE = 5
TARGET_SCORE = 6
# Format (config, scenario, map, difficulty, actions, min, target)
DOOM_SETTINGS = [
['basic.cfg', 'basic.wad', 'map01', 5, [0, 10, 11], -485, 10], # 0 - Basic
['deadly_corridor.cfg', 'deadly_corridor.wad', '', 1, [0, 10, 11, 13, 14, 15], -120, 1000], # 1 - Corridor
['defend_the_center.cfg', 'defend_the_center.wad', '', 5, [0, 14, 15], -1, 10], # 2 - DefendCenter
['defend_the_line.cfg', 'defend_the_line.wad', '', 5, [0, 14, 15], -1, 15], # 3 - DefendLine
['health_gathering.cfg', 'health_gathering.wad', 'map01', 5, [13, 14, 15], 0, 1000], # 4 - HealthGathering
['my_way_home.cfg', 'my_way_home.wad', '', 5, [13, 14, 15], -0.22, 0.5], # 5 - MyWayHome
['predict_position.cfg', 'predict_position.wad', 'map01', 3, [0, 14, 15], -0.075, 0.5], # 6 - PredictPosition
['take_cover.cfg', 'take_cover.wad', 'map01', 5, [10, 11], 0, 750], # 7 - TakeCover
['deathmatch.cfg', 'deathmatch.wad', '', 5, [x for x in range(NUM_ACTIONS) if x != 33], 0, 20] # 8 - Deathmatch
]
class DoomEnv(gym.Env, utils.EzPickle):
metadata = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 35}
def __init__(self, level):
utils.EzPickle.__init__(self)
self.previous_level = -1
self.level = level
self.game = DoomGame()
self.loader = Loader()
self.doom_dir = os.path.dirname(os.path.abspath(__file__))
self.mode = 'fast' # 'human', 'fast' or 'normal'
self.no_render = False # To disable double rendering in human mode
self.viewer = None
self.is_initialized = False # Indicates that reset() has been called
self.curr_seed = 0
self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
self.allowed_actions = list(range(NUM_ACTIONS))
self._seed()
self._configure()
def _configure(self, screen_resolution=ScreenResolution.RES_640X480):
# Often agents end up downsampling the observations. Configuring Doom to
# return a smaller image yields significant (~10x) speedups
if screen_resolution == ScreenResolution.RES_640X480:
self.screen_height = 480
self.screen_width = 640
self.screen_resolution = ScreenResolution.RES_640X480
elif screen_resolution == ScreenResolution.RES_160X120:
self.screen_height = 120
self.screen_width = 160
self.screen_resolution = ScreenResolution.RES_160X120
self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
def _load_level(self):
# Closing if is_initialized
if self.is_initialized:
self.is_initialized = False
self.game.close()
self.game = DoomGame()
# Loading Paths
if not self.is_initialized:
self.game.set_vizdoom_path(self.loader.get_vizdoom_path())
self.game.set_doom_game_path(self.loader.get_freedoom_path())
# Common settings
self._closed = False
self.game.load_config(os.path.join(self.doom_dir, 'assets/%s' % DOOM_SETTINGS[self.level][CONFIG]))
self.game.set_doom_scenario_path(self.loader.get_scenario_path(DOOM_SETTINGS[self.level][SCENARIO]))
if DOOM_SETTINGS[self.level][MAP] != '':
self.game.set_doom_map(DOOM_SETTINGS[self.level][MAP])
self.game.set_doom_skill(DOOM_SETTINGS[self.level][DIFFICULTY])
self.previous_level = self.level
self.allowed_actions = DOOM_SETTINGS[self.level][ACTIONS]
self.game.set_screen_resolution(self.screen_resolution)
# Algo mode
if 'human' != self.mode:
self.game.set_window_visible(False)
self.game.set_mode(Mode.PLAYER)
self.no_render = False
self.game.init()
self._start_episode()
self.is_initialized = True
return self.game.get_state().image_buffer.copy()
# Human mode
else:
self.game.add_game_args('+freelook 1')
self.game.set_window_visible(True)
self.game.set_mode(Mode.SPECTATOR)
self.no_render = True
self.game.init()
self._start_episode()
self.is_initialized = True
self._play_human_mode()
return np.zeros(shape=self.observation_space.shape, dtype=np.uint8)
def _start_episode(self):
if self.curr_seed > 0:
self.game.set_seed(self.curr_seed)
self.curr_seed = 0
self.game.new_episode()
return
def _play_human_mode(self):
while not self.game.is_episode_finished():
self.game.advance_action()
state = self.game.get_state()
total_reward = self.game.get_total_reward()
info = self._get_game_variables(state.game_variables)
info["TOTAL_REWARD"] = round(total_reward, 4)
print('===============================')
print('State: #' + str(state.number))
print('Action: \t' + str(self.game.get_last_action()) + '\t (=> only allowed actions)')
print('Reward: \t' + str(self.game.get_last_reward()))
print('Total Reward: \t' + str(total_reward))
print('Variables: \n' + str(info))
sleep(0.02857) # 35 fps = 0.02857 sleep between frames
print('===============================')
print('Done')
return
def _step(self, action):
if NUM_ACTIONS != len(action):
logger.warn('Doom action list must contain %d items. Padding missing items with 0' % NUM_ACTIONS)
old_action = action
action = [0] * NUM_ACTIONS
for i in range(len(old_action)):
action[i] = old_action[i]
# action is a list of numbers but DoomGame.make_action expects a list of ints
if len(self.allowed_actions) > 0:
list_action = [int(action[action_idx]) for action_idx in self.allowed_actions]
else:
list_action = [int(x) for x in action]
try:
reward = self.game.make_action(list_action)
state = self.game.get_state()
info = self._get_game_variables(state.game_variables)
info["TOTAL_REWARD"] = round(self.game.get_total_reward(), 4)
if self.game.is_episode_finished():
is_finished = True
return np.zeros(shape=self.observation_space.shape, dtype=np.uint8), reward, is_finished, info
else:
is_finished = False
return state.image_buffer.copy(), reward, is_finished, info
except doom_py.vizdoom.ViZDoomIsNotRunningException:
return np.zeros(shape=self.observation_space.shape, dtype=np.uint8), 0, True, {}
def _reset(self):
if self.is_initialized and not self._closed:
self._start_episode()
return self.game.get_state().image_buffer.copy()
else:
return self._load_level()
def _render(self, mode='human', close=False):
if close:
if self.viewer is not None:
self.viewer.close()
self.viewer = None # If we don't None out this reference pyglet becomes unhappy
return
try:
if 'human' == mode and self.no_render:
return
state = self.game.get_state()
img = state.image_buffer
# VizDoom returns None if the episode is finished, let's make it
# an empty image so the recorder doesn't stop
if img is None:
img = np.zeros(shape=self.observation_space.shape, dtype=np.uint8)
if mode == 'rgb_array':
return img
elif mode is 'human':
from gym.envs.classic_control import rendering
if self.viewer is None:
self.viewer = rendering.SimpleImageViewer()
self.viewer.imshow(img)
if 'normal' == self.mode:
sleep(0.02857) # 35 fps = 0.02857 sleep between frames
except doom_py.vizdoom.ViZDoomIsNotRunningException:
pass # Doom has been closed
def _close(self):
self.game.close()
def _seed(self, seed=None):
self.curr_seed = seeding.hash_seed(seed) % 2 ** 32
return [self.curr_seed]
def _get_game_variables(self, state_variables):
info = {
"LEVEL": self.level
}
if state_variables is None:
return info
info['KILLCOUNT'] = state_variables[0]
info['ITEMCOUNT'] = state_variables[1]
info['SECRETCOUNT'] = state_variables[2]
info['FRAGCOUNT'] = state_variables[3]
info['HEALTH'] = state_variables[4]
info['ARMOR'] = state_variables[5]
info['DEAD'] = state_variables[6]
info['ON_GROUND'] = state_variables[7]
info['ATTACK_READY'] = state_variables[8]
info['ALTATTACK_READY'] = state_variables[9]
info['SELECTED_WEAPON'] = state_variables[10]
info['SELECTED_WEAPON_AMMO'] = state_variables[11]
info['AMMO1'] = state_variables[12]
info['AMMO2'] = state_variables[13]
info['AMMO3'] = state_variables[14]
info['AMMO4'] = state_variables[15]
info['AMMO5'] = state_variables[16]
info['AMMO6'] = state_variables[17]
info['AMMO7'] = state_variables[18]
info['AMMO8'] = state_variables[19]
info['AMMO9'] = state_variables[20]
info['AMMO0'] = state_variables[21]
return info
class MetaDoomEnv(DoomEnv):
def __init__(self, average_over=10, passing_grade=600, min_tries_for_avg=5):
super(MetaDoomEnv, self).__init__(0)
self.average_over = average_over
self.passing_grade = passing_grade
self.min_tries_for_avg = min_tries_for_avg # Need to use at least this number of tries to calc avg
self.scores = [[]] * NUM_LEVELS
self.locked_levels = [True] * NUM_LEVELS # Locking all levels but the first
self.locked_levels[0] = False
self.total_reward = 0
self.find_new_level = False # Indicates that we need a level change
self._unlock_levels()
def _play_human_mode(self):
while not self.game.is_episode_finished():
self.game.advance_action()
state = self.game.get_state()
episode_reward = self.game.get_total_reward()
(reward, self.total_reward) = self._calculate_reward(episode_reward, self.total_reward)
info = self._get_game_variables(state.game_variables)
info["SCORES"] = self.get_scores()
info["TOTAL_REWARD"] = round(self.total_reward, 4)
info["LOCKED_LEVELS"] = self.locked_levels
print('===============================')
print('State: #' + str(state.number))
print('Action: \t' + str(self.game.get_last_action()) + '\t (=> only allowed actions)')
print('Reward: \t' + str(reward))
print('Total Reward: \t' + str(self.total_reward))
print('Variables: \n' + str(info))
sleep(0.02857) # 35 fps = 0.02857 sleep between frames
print('===============================')
print('Done')
return
def _get_next_level(self):
# Finds the unlocked level with the lowest average
averages = self.get_scores()
lowest_level = 0 # Defaulting to first level
lowest_score = 1001
for i in range(NUM_LEVELS):
if not self.locked_levels[i]:
if averages[i] < lowest_score:
lowest_level = i
lowest_score = averages[i]
return lowest_level
def _unlock_levels(self):
averages = self.get_scores()
for i in range(NUM_LEVELS - 2, -1, -1):
if self.locked_levels[i + 1] and averages[i] >= self.passing_grade:
self.locked_levels[i + 1] = False
return
def _start_episode(self):
if 0 == len(self.scores[self.level]):
self.scores[self.level] = [0] * self.min_tries_for_avg
else:
self.scores[self.level].insert(0, 0)
self.scores[self.level] = self.scores[self.level][:self.min_tries_for_avg]
self.is_new_episode = True
return super(MetaDoomEnv, self)._start_episode()
def change_level(self, new_level=None):
if new_level is not None and self.locked_levels[new_level] == False:
self.find_new_level = False
self.level = new_level
self.reset()
else:
self.find_new_level = False
self.level = self._get_next_level()
self.reset()
return
def _get_standard_reward(self, episode_reward):
# Returns a standardized reward for an episode (i.e. between 0 and 1,000)
min_score = float(DOOM_SETTINGS[self.level][MIN_SCORE])
target_score = float(DOOM_SETTINGS[self.level][TARGET_SCORE])
max_score = min_score + (target_score - min_score) / 0.99 # Target is 99th percentile (Scale 0-1000)
std_reward = round(1000 * (episode_reward - min_score) / (max_score - min_score), 4)
std_reward = min(1000, std_reward) # Cannot be more than 1,000
std_reward = max(0, std_reward) # Cannot be less than 0
return std_reward
def get_total_reward(self):
# Returns the sum of the average of all levels
total_score = 0
passed_levels = 0
for i in range(NUM_LEVELS):
if len(self.scores[i]) > 0:
level_total = 0
level_count = min(len(self.scores[i]), self.average_over)
for j in range(level_count):
level_total += self.scores[i][j]
level_average = level_total / level_count
if level_average >= 990:
passed_levels += 1
total_score += level_average
# Bonus for passing all levels (50 * num of levels)
if NUM_LEVELS == passed_levels:
total_score += NUM_LEVELS * 50
return round(total_score, 4)
def _calculate_reward(self, episode_reward, prev_total_reward):
# Calculates the action reward and the new total reward
std_reward = self._get_standard_reward(episode_reward)
self.scores[self.level][0] = std_reward
total_reward = self.get_total_reward()
reward = total_reward - prev_total_reward
return reward, total_reward
def get_scores(self):
# Returns a list with the averages per level
averages = [0] * NUM_LEVELS
for i in range(NUM_LEVELS):
if len(self.scores[i]) > 0:
level_total = 0
level_count = min(len(self.scores[i]), self.average_over)
for j in range(level_count):
level_total += self.scores[i][j]
level_average = level_total / level_count
averages[i] = round(level_average, 4)
return averages
def _reset(self):
# Reset is called on first step() after level is finished
# or when change_level() is called. Returning if neither have been called to
# avoid resetting the level twice
if self.find_new_level:
return
if self.is_initialized and not self._closed and self.previous_level == self.level:
self._start_episode()
return self.game.get_state().image_buffer.copy()
else:
return self._load_level()
def _step(self, action):
# Changing level
if self.find_new_level:
self.change_level()
if 'human' == self.mode:
self._play_human_mode()
obs = np.zeros(shape=self.observation_space.shape, dtype=np.uint8)
reward = 0
is_finished = True
info = self._get_game_variables(None)
else:
obs, step_reward, is_finished, info = super(MetaDoomEnv, self)._step(action)
reward, self.total_reward = self._calculate_reward(self.game.get_total_reward(), self.total_reward)
# First step() after new episode returns the entire total reward
# because stats_recorder resets the episode score to 0 after reset() is called
if self.is_new_episode:
reward = self.total_reward
self.is_new_episode = False
info["SCORES"] = self.get_scores()
info["TOTAL_REWARD"] = round(self.total_reward, 4)
info["LOCKED_LEVELS"] = self.locked_levels
# Indicating new level required
if is_finished:
self._unlock_levels()
self.find_new_level = True
return obs, reward, is_finished, info