Files
Gymnasium/gym/envs/algorithmic/algorithmic_env.py
Greg Brockman 58e6aa95e5 [WIP] add support for seeding environments (#135)
* Make environments seedable

* Fix monitor bugs

- Set monitor_id before setting the infix. This was a bug that would yield incorrect results with multiple monitors.
- Remove extra pid from stats recorder filename. This should be purely cosmetic.

* Start uploading seeds in episode_batch

* Fix _bigint_from_bytes for python3

* Set seed explicitly in random_agent

* Pass through seed argument

* Also pass through random state to spaces

* Pass random state into the observation/action spaces

* Make all _seed methods return the list of used seeds

* Switch over to np.random where possible

* Start hashing seeds, and also seed doom engine

* Fixup seeding determinism in many cases

* Seed before loading the ROM

* Make seeding more Python3 friendly

* Make the MuJoCo skipping a bit more forgiving

* Remove debugging PDB calls

* Make setInt argument into raw bytes

* Validate and upload seeds

* Skip box2d

* Make seeds smaller, and change representation of seeds in upload

* Handle long seeds

* Fix RandomAgent example to be deterministic

* Handle integer types correctly in Python2 and Python3

* Try caching pip

* Try adding swap

* Add df and free calls

* Bump swap

* Bump swap size

* Try setting overcommit

* Try other sysctls

* Try fixing overcommit

* Try just setting overcommit_memory=1

* Add explanatory comment

* Add what's new section to readme

* BUG: Mark ElevatorAction-ram-v0 as non-deterministic for now

* Document seed

* Move nondetermistic check into spec
2016-05-29 09:07:09 -07:00

211 lines
7.2 KiB
Python

from gym import Env
from gym.spaces import Discrete, Tuple
from gym.utils import colorize, seeding
import numpy as np
from six import StringIO
import sys
import math
hash_base = None
def ha(array):
return (hash_base * (array + 5)).sum()
class AlgorithmicEnv(Env):
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, inp_dim=1, base=10, chars=False):
global hash_base
hash_base = 50 ** np.arange(inp_dim)
self.base = base
self.last = 10
self.total_reward = 0
self.sum_reward = 0
AlgorithmicEnv.sum_rewards = []
self.chars = chars
self.inp_dim = inp_dim
AlgorithmicEnv.current_length = 2
tape_control = []
self._seed()
self.reset()
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
self.action_space = Tuple(([Discrete(2 * self.inp_dim, np_random=self.np_random), Discrete(2, np_random=self.np_random), Discrete(self.base, np_random=self.np_random)]))
self.observation_space = Discrete(self.base + 1, np_random=self.np_random)
return [seed]
def _get_obs(self, pos=None):
if pos is None:
pos = self.x
assert(isinstance(pos, np.ndarray) and pos.shape[0] == self.inp_dim)
if ha(pos) not in self.content:
self.content[ha(pos)] = self.base
return self.content[ha(pos)]
def _get_str_obs(self, pos=None):
ret = self._get_obs(pos)
if ret == self.base:
return " "
else:
if self.chars:
return chr(ret + ord('A'))
return str(ret)
def _get_str_target(self, pos=None):
if pos not in self.target:
return " "
else:
ret = self.target[pos]
if self.chars:
return chr(ret + ord('A'))
return str(ret)
def _render_observation(self):
x = self.x
if self.inp_dim == 1:
x_str = "Observation Tape : "
for i in range(-2, self.total_len + 2):
if i == x:
x_str += colorize(self._get_str_obs(np.array([i])), 'green', highlight=True)
else:
x_str += self._get_str_obs(np.array([i]))
x_str += "\n"
return x_str
elif self.inp_dim == 2:
label = "Observation Grid : "
x_str = ""
for j in range(-1, 3):
if j != -1:
x_str += " " * len(label)
for i in range(-2, self.total_len + 2):
if i == x[0] and j == x[1]:
x_str += colorize(self._get_str_obs(np.array([i, j])), 'green', highlight=True)
else:
x_str += self._get_str_obs(np.array([i, j]))
x_str += "\n"
x_str = label + x_str
return x_str
else:
assert(False)
def _render(self, mode='human', close=False):
if close:
# Nothing interesting to close
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
inp = "Total length of input instance: %d, step: %d\n" % (self.total_len, self.time)
outfile.write(inp)
x, y, action = self.x, self.y, self.last_action
if action is not None:
inp_act, out_act, pred = action
outfile.write("=" * (len(inp) - 1) + "\n")
y_str = "Output Tape : "
target_str = "Targets : "
if action is not None:
if self.chars:
pred_str = chr(pred + ord('A'))
else:
pred_str = str(pred)
x_str = self._render_observation()
max_len = int(self.total_reward) + 1
for i in range(-2, max_len):
if i not in self.target:
y_str += " "
continue
target_str += self._get_str_target(i)
if i < y - 1:
y_str += self._get_str_target(i)
elif i == (y - 1):
if action is not None and out_act == 1:
if pred == self.target[i]:
y_str += colorize(pred_str, 'green', highlight=True)
else:
y_str += colorize(pred_str, 'red', highlight=True)
else:
y_str += self._get_str_target(i)
outfile.write(x_str)
outfile.write(y_str + "\n")
outfile.write(target_str + "\n\n")
if action is not None:
outfile.write("Current reward : %.3f\n" % self.reward)
outfile.write("Cumulative reward : %.3f\n" % self.sum_reward)
move = ""
if inp_act == 0:
move = "left"
elif inp_act == 1:
move = "right"
elif inp_act == 2:
move += "up"
elif inp_act == 3:
move += "down"
outfile.write("Action : Tuple(move over input: %s,\n" % move)
if out_act == 1:
out_act = "True"
else:
out_act = "False"
outfile.write(" write to the output tape: %s,\n" % out_act)
outfile.write(" prediction: %s)\n" % pred_str)
else:
outfile.write("\n" * 5)
return outfile
def _step(self, action):
self.last_action = action
inp_act, out_act, pred = action
done = False
reward = 0.0
# We are outside the sample.
self.time += 1
if self.y not in self.target:
reward = -10.0
done = True
else:
if out_act == 1:
if pred == self.target[self.y]:
reward = 1.0
else:
reward = -0.5
done = True
self.y += 1
if self.y not in self.target:
done = True
if inp_act == 0:
self.x[0] -= 1
elif inp_act == 1:
self.x[0] += 1
elif inp_act == 2:
self.x[1] -= 1
elif inp_act == 3:
self.x[1] += 1
if self.time > self.total_len + self.total_reward + 4:
reward = -1.0
done = True
obs = self._get_obs()
self.reward = reward
self.sum_reward += reward
return (obs, reward, done, {})
def _reset(self):
self.last_action = None
self.x = np.zeros(self.inp_dim).astype(np.int)
self.y = 0
AlgorithmicEnv.sum_rewards.append(self.sum_reward - self.total_reward)
AlgorithmicEnv.sum_rewards = AlgorithmicEnv.sum_rewards[-self.last:]
if len(AlgorithmicEnv.sum_rewards) == self.last and \
min(AlgorithmicEnv.sum_rewards) >= -1.0 and \
AlgorithmicEnv.current_length < 30:
AlgorithmicEnv.current_length += 1
AlgorithmicEnv.sum_rewards = []
self.sum_reward = 0.0
self.time = 0
self.total_len = self.np_random.randint(3) + AlgorithmicEnv.current_length
self.set_data()
return self._get_obs()