Algorithmic refactor (#383)

* Refactor/document algorithmic environments and add tests.

* test for 3 row addition

* Fix failing rollout test by reinserting quirk in reversedAddition env

* todo regarding addition3-v0

* Fix python 3 division issues

* typo fix

* Re-generate python3 rollout file to account for ReversedAddition bug fix
This commit is contained in:
Colin
2016-10-21 16:06:48 -07:00
committed by Greg Brockman
parent bee6be5632
commit e84bd0ffe1
11 changed files with 4849 additions and 947 deletions

View File

@@ -1,4 +1,4 @@
from gym.envs.algorithmic.copy import CopyEnv
from gym.envs.algorithmic.copy_ import CopyEnv
from gym.envs.algorithmic.repeat_copy import RepeatCopyEnv
from gym.envs.algorithmic.duplicated_input import DuplicatedInputEnv
from gym.envs.algorithmic.reverse import ReverseEnv

View File

@@ -1,3 +1,35 @@
"""
Algorithmic environments have the following traits in common:
- A 1-d "input tape" or 2-d "input grid" of characters
- A target string which is a deterministic function of the input characters
Agents control a read head that moves over the input tape. Observations consist
of the single character currently under the read head. The read head may fall
off the end of the tape in any direction. When this happens, agents will observe
a special blank character (with index=env.base) until they get back in bounds.
Actions consist of 3 sub-actions:
- Direction to move the read head (left or right, plus up and down for 2-d envs)
- Whether to write to the output tape
- Which character to write (ignored if the above sub-action is 0)
An episode ends when:
- The agent writes the full target string to the output tape.
- The agent writes an incorrect character.
- The agent runs out the time limit. (Which is fairly conservative.)
Reward schedule:
write a correct character: +1
write a wrong character: -.5
run out the clock: -1
otherwise: 0
In the beginning, input strings will be fairly short. After an environment has
been consistently solved over some window of episodes, the environment will
increase the average length of generated strings. Typical env specs require
leveling up many times to reach their reward threshold.
"""
from gym import Env
from gym.spaces import Discrete, Tuple
from gym.utils import colorize, seeding
@@ -5,93 +37,82 @@ import numpy as np
from six import StringIO
import sys
import math
import logging
hash_base = None
def ha(array):
return (hash_base * (array + 5)).sum()
logger = logging.getLogger(__name__)
class AlgorithmicEnv(Env):
metadata = {'render.modes': ['human', 'ansi']}
# Only 'promote' the length of generated input strings if the worst of the
# last n episodes was no more than this far from the maximum reward
MIN_REWARD_SHORTFALL_FOR_PROMOTION = -1.0
def __init__(self, inp_dim=1, base=10, chars=False):
global hash_base
hash_base = 50 ** np.arange(inp_dim)
def __init__(self, base=10, chars=False, starting_min_length=2):
"""
base: Number of distinct characters.
chars: If True, use uppercase alphabet. Otherwise, digits. Only affects
rendering.
starting_min_length: Minimum input string length. Ramps up as episodes
are consistently solved.
"""
self.base = base
# Keep track of this many past episodes
self.last = 10
self.total_reward = 0
self.sum_reward = 0
AlgorithmicEnv.sum_rewards = []
self.chars = chars
self.inp_dim = inp_dim
AlgorithmicEnv.current_length = 2
tape_control = []
self.action_space = Tuple(([Discrete(2 * self.inp_dim), Discrete(2), Discrete(self.base)]))
# Cumulative reward earned this episode
self.episode_total_reward = None
# Running tally of reward shortfalls. e.g. if there were 10 points to earn and
# we got 8, we'd append -2
AlgorithmicEnv.reward_shortfalls = []
if chars:
self.charmap = [chr(ord('A')+i) for i in range(base)]
else:
self.charmap = [str(i) for i in range(base)]
self.charmap.append(' ')
# TODO: Not clear why this is a class variable rather than instance.
# Could lead to some spooky action at a distance if someone is working
# with multiple algorithmic envs at once. Also makes testing tricky.
AlgorithmicEnv.min_length = starting_min_length
# Three sub-actions:
# 1. Move read head left or write (or up/down)
# 2. Write or not
# 3. Which character to write. (Ignored if should_write=0)
self.action_space = Tuple(
[Discrete(len(self.MOVEMENTS)), Discrete(2), Discrete(self.base)]
)
# Can see just what is on the input tape (one of n characters, or nothing)
self.observation_space = Discrete(self.base + 1)
self._seed()
self.reset()
@classmethod
def _movement_idx(kls, movement_name):
return kls.MOVEMENTS.index(movement_name)
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def _get_obs(self, pos=None):
if pos is None:
pos = self.x
assert isinstance(pos, np.ndarray) and pos.shape[0] == self.inp_dim
if ha(pos) not in self.content:
self.content[ha(pos)] = self.base
return self.content[ha(pos)]
"""Return an observation corresponding to the given read head position
(or the current read head position, if none is given)."""
raise NotImplemented
def _get_str_obs(self, pos=None):
ret = self._get_obs(pos)
if ret == self.base:
return " "
else:
if self.chars:
return chr(ret + ord('A'))
return str(ret)
return self.charmap[ret]
def _get_str_target(self, pos=None):
if pos not in self.target:
def _get_str_target(self, pos):
"""Return the ith character of the target string (or " " if index
out of bounds)."""
if pos < 0 or len(self.target) <= pos:
return " "
else:
ret = self.target[pos]
if self.chars:
return chr(ret + ord('A'))
return str(ret)
return self.charmap[self.target[pos]]
def _render_observation(self):
x = self.x
if self.inp_dim == 1:
x_str = "Observation Tape : "
for i in range(-2, self.total_len + 2):
if i == x:
x_str += colorize(self._get_str_obs(np.array([i])), 'green', highlight=True)
else:
x_str += self._get_str_obs(np.array([i]))
x_str += "\n"
return x_str
elif self.inp_dim == 2:
label = "Observation Grid : "
x_str = ""
for j in range(-1, 3):
if j != -1:
x_str += " " * len(label)
for i in range(-2, self.total_len + 2):
if i == x[0] and j == x[1]:
x_str += colorize(self._get_str_obs(np.array([i, j])), 'green', highlight=True)
else:
x_str += self._get_str_obs(np.array([i, j]))
x_str += "\n"
x_str = label + x_str
return x_str
else:
assert False
"""Return a string representation of the input tape/grid."""
raise NotImplemented
def _render(self, mode='human', close=False):
if close:
@@ -99,34 +120,25 @@ class AlgorithmicEnv(Env):
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
inp = "Total length of input instance: %d, step: %d\n" % (self.total_len, self.time)
inp = "Total length of input instance: %d, step: %d\n" % (self.input_width, self.time)
outfile.write(inp)
x, y, action = self.x, self.y, self.last_action
x, y, action = self.read_head_position, self.write_head_position, self.last_action
if action is not None:
inp_act, out_act, pred = action
outfile.write("=" * (len(inp) - 1) + "\n")
y_str = "Output Tape : "
target_str = "Targets : "
if action is not None:
if self.chars:
pred_str = chr(pred + ord('A'))
else:
pred_str = str(pred)
pred_str = self.charmap[pred]
x_str = self._render_observation()
max_len = int(self.total_reward) + 1
for i in range(-2, max_len):
if i not in self.target:
y_str += " "
continue
for i in range(-2, len(self.target) + 2):
target_str += self._get_str_target(i)
if i < y - 1:
y_str += self._get_str_target(i)
elif i == (y - 1):
if action is not None and out_act == 1:
if pred == self.target[i]:
y_str += colorize(pred_str, 'green', highlight=True)
else:
y_str += colorize(pred_str, 'red', highlight=True)
color = 'green' if pred == self.target[i] else 'red'
y_str += colorize(pred_str, color, highlight=True)
else:
y_str += self._get_str_target(i)
outfile.write(x_str)
@@ -134,77 +146,185 @@ class AlgorithmicEnv(Env):
outfile.write(target_str + "\n\n")
if action is not None:
outfile.write("Current reward : %.3f\n" % self.reward)
outfile.write("Cumulative reward : %.3f\n" % self.sum_reward)
move = ""
if inp_act == 0:
move = "left"
elif inp_act == 1:
move = "right"
elif inp_act == 2:
move += "up"
elif inp_act == 3:
move += "down"
outfile.write("Current reward : %.3f\n" % self.last_reward)
outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward)
move = self.MOVEMENTS[inp_act]
outfile.write("Action : Tuple(move over input: %s,\n" % move)
if out_act == 1:
out_act = "True"
else:
out_act = "False"
out_act = out_act == 1
outfile.write(" write to the output tape: %s,\n" % out_act)
outfile.write(" prediction: %s)\n" % pred_str)
else:
outfile.write("\n" * 5)
return outfile
@property
def input_width(self):
return len(self.input_data)
def _step(self, action):
assert self.action_space.contains(action)
self.last_action = action
inp_act, out_act, pred = action
done = False
reward = 0.0
# We are outside the sample.
self.time += 1
if self.y not in self.target:
reward = -10.0
done = True
else:
assert 0 <= self.write_head_position
if out_act == 1:
if pred == self.target[self.y]:
try:
correct = pred == self.target[self.write_head_position]
except IndexError:
logger.warn("It looks like you're calling step() even though this "+
"environment has already returned done=True. You should always call "+
"reset() once you receive done=True. Any further steps are undefined "+
"behaviour.")
correct = False
if correct:
reward = 1.0
else:
# Bail as soon as a wrong character is written to the tape
reward = -0.5
done = True
self.y += 1
if self.y not in self.target:
self.write_head_position += 1
if self.write_head_position >= len(self.target):
done = True
if inp_act == 0:
self.x[0] -= 1
elif inp_act == 1:
self.x[0] += 1
elif inp_act == 2:
self.x[1] -= 1
elif inp_act == 3:
self.x[1] += 1
if self.time > self.total_len + self.total_reward + 4:
self._move(inp_act)
if self.time > self.time_limit:
reward = -1.0
done = True
obs = self._get_obs()
self.reward = reward
self.sum_reward += reward
self.last_reward = reward
self.episode_total_reward += reward
return (obs, reward, done, {})
@property
def time_limit(self):
"""If an agent takes more than this many timesteps, end the episode
immediately and return a negative reward."""
# (Seemingly arbitrary)
return self.input_width + len(self.target) + 4
def _check_levelup(self):
"""Called between episodes. Update our running record of episode rewards
and, if appropriate, 'level up' minimum input length."""
if self.episode_total_reward is None:
# This is before the first episode/call to reset(). Nothing to do
return
AlgorithmicEnv.reward_shortfalls.append(self.episode_total_reward - len(self.target))
AlgorithmicEnv.reward_shortfalls = AlgorithmicEnv.reward_shortfalls[-self.last:]
if len(AlgorithmicEnv.reward_shortfalls) == self.last and \
min(AlgorithmicEnv.reward_shortfalls) >= self.MIN_REWARD_SHORTFALL_FOR_PROMOTION and \
AlgorithmicEnv.min_length < 30:
AlgorithmicEnv.min_length += 1
AlgorithmicEnv.reward_shortfalls = []
def _reset(self):
self._check_levelup()
self.last_action = None
self.x = np.zeros(self.inp_dim).astype(np.int)
self.y = 0
AlgorithmicEnv.sum_rewards.append(self.sum_reward - self.total_reward)
AlgorithmicEnv.sum_rewards = AlgorithmicEnv.sum_rewards[-self.last:]
if len(AlgorithmicEnv.sum_rewards) == self.last and \
min(AlgorithmicEnv.sum_rewards) >= -1.0 and \
AlgorithmicEnv.current_length < 30:
AlgorithmicEnv.current_length += 1
AlgorithmicEnv.sum_rewards = []
self.sum_reward = 0.0
self.last_reward = 0
self.read_head_position = self.READ_HEAD_START
self.write_head_position = 0
self.episode_total_reward = 0.0
self.time = 0
self.total_len = self.np_random.randint(3) + AlgorithmicEnv.current_length
self.set_data()
length = self.np_random.randint(3) + AlgorithmicEnv.min_length
self.input_data = self.generate_input_data(length)
self.target = self.target_from_input_data(self.input_data)
return self._get_obs()
def generate_input_data(self, size):
raise NotImplemented
def target_from_input_data(self, input_data):
raise NotImplemented("Subclasses must implement")
def _move(self, movement):
raise NotImplemented
class TapeAlgorithmicEnv(AlgorithmicEnv):
"""An algorithmic env with a 1-d input tape."""
MOVEMENTS = ['left', 'right']
READ_HEAD_START = 0
def _move(self, movement):
named = self.MOVEMENTS[movement]
self.read_head_position += 1 if named == 'right' else -1
def _get_obs(self, pos=None):
if pos is None:
pos = self.read_head_position
if pos < 0:
return self.base
try:
return self.input_data[pos]
except IndexError:
return self.base
def generate_input_data(self, size):
return [self.np_random.randint(self.base) for _ in range(size)]
def _render_observation(self):
x = self.read_head_position
x_str = "Observation Tape : "
for i in range(-2, self.input_width + 2):
if i == x:
x_str += colorize(self._get_str_obs(np.array([i])), 'green', highlight=True)
else:
x_str += self._get_str_obs(np.array([i]))
x_str += "\n"
return x_str
class GridAlgorithmicEnv(AlgorithmicEnv):
"""An algorithmic env with a 2-d input grid."""
MOVEMENTS = ['left', 'right', 'up', 'down']
READ_HEAD_START = (0, 0)
def __init__(self, rows, *args, **kwargs):
self.rows = rows
AlgorithmicEnv.__init__(self, *args, **kwargs)
def _move(self, movement):
named = self.MOVEMENTS[movement]
x, y = self.read_head_position
if named == 'left':
x -= 1
elif named == 'right':
x += 1
elif named == 'up':
y -= 1
elif named == 'down':
y += 1
else:
raise ValueError("Unrecognized direction: {}".format(named))
self.read_head_position = x, y
def generate_input_data(self, size):
return [
[self.np_random.randint(self.base) for _ in range(self.rows)]
for __ in range(size)
]
def _get_obs(self, pos=None):
if pos is None:
pos = self.read_head_position
x, y = pos
if any(idx < 0 for idx in pos):
return self.base
try:
return self.input_data[x][y]
except IndexError:
return self.base
def _render_observation(self):
x = self.read_head_position
label = "Observation Grid : "
x_str = ""
for j in range(-1, self.rows+1):
if j != -1:
x_str += " " * len(label)
for i in range(-2, self.input_width + 2):
if i == x[0] and j == x[1]:
x_str += colorize(self._get_str_obs((i, j)), 'green', highlight=True)
else:
x_str += self._get_str_obs((i, j))
x_str += "\n"
x_str = label + x_str
return x_str

View File

@@ -1,22 +0,0 @@
"""
Task is to copy content from the input tape to
the output tape. http://arxiv.org/abs/1511.07275
"""
import numpy as np
from gym.envs.algorithmic import algorithmic_env
from gym.envs.algorithmic.algorithmic_env import ha
class CopyEnv(algorithmic_env.AlgorithmicEnv):
def __init__(self, base=5):
algorithmic_env.AlgorithmicEnv.__init__(self,
inp_dim=1,
base=base,
chars=True)
def set_data(self):
self.content = {}
self.target = {}
for i in range(self.total_len):
val = self.np_random.randint(self.base)
self.content[ha(np.array([i]))] = val
self.target[i] = val
self.total_reward = self.total_len

View File

@@ -0,0 +1,14 @@
"""
Task is to copy content from the input tape to
the output tape. http://arxiv.org/abs/1511.07275
"""
import numpy as np
from gym.envs.algorithmic import algorithmic_env
class CopyEnv(algorithmic_env.TapeAlgorithmicEnv):
def __init__(self, base=5, chars=True):
super(CopyEnv, self).__init__(base=base, chars=chars)
def target_from_input_data(self, input_data):
return input_data

View File

@@ -1,26 +1,25 @@
"""
Task is to return every second character from the input tape.
Task is to return every nth character from the input tape.
http://arxiv.org/abs/1511.07275
"""
from __future__ import division
import numpy as np
from gym.envs.algorithmic import algorithmic_env
from gym.envs.algorithmic.algorithmic_env import ha
class DuplicatedInputEnv(algorithmic_env.AlgorithmicEnv):
class DuplicatedInputEnv(algorithmic_env.TapeAlgorithmicEnv):
def __init__(self, duplication=2, base=5):
self.duplication = duplication
algorithmic_env.AlgorithmicEnv.__init__(self,
inp_dim=1,
base=base,
chars=True)
def set_data(self):
self.content = {}
self.target = {}
copies = int(self.total_len / self.duplication)
for i in range(copies):
val = self.np_random.randint(self.base)
self.target[i] = val
for d in range(self.duplication):
self.content[ha(np.array([i * self.duplication + d]))] = val
self.total_reward = self.total_len / self.duplication
super(DuplicatedInputEnv, self).__init__(base=base, chars=True)
def generate_input_data(self, size):
res = []
if size < self.duplication:
size = self.duplication
for i in range(size//self.duplication):
char = self.np_random.randint(self.base)
for _ in range(self.duplication):
res.append(char)
return res
def target_from_input_data(self, input_data):
return [input_data[i] for i in range(0, len(input_data), self.duplication)]

View File

@@ -1,27 +1,16 @@
"""
Task is to copy content multiple-times from the input tape to
Task is to copy content multiple times from the input tape to
the output tape. http://arxiv.org/abs/1511.07275
"""
import numpy as np
from gym.envs.algorithmic import algorithmic_env
from gym.envs.algorithmic.algorithmic_env import ha
class RepeatCopyEnv(algorithmic_env.AlgorithmicEnv):
class RepeatCopyEnv(algorithmic_env.TapeAlgorithmicEnv):
MIN_REWARD_SHORTFALL_FOR_PROMOTION = -.1
def __init__(self, base=5):
algorithmic_env.AlgorithmicEnv.__init__(self,
inp_dim=1,
base=base,
chars=True)
super(RepeatCopyEnv, self).__init__(base=base, chars=True)
self.last = 50
def set_data(self):
self.content = {}
self.target = {}
unique = set()
for i in range(self.total_len):
val = self.np_random.randint(self.base)
self.content[ha(np.array([i]))] = val
self.target[i] = val
self.target[2 * self.total_len - i - 1] = val
self.target[2 * self.total_len + i] = val
self.total_reward = 3.0 * self.total_len + 0.9
def target_from_input_data(self, input_data):
return input_data + list(reversed(input_data)) + input_data

View File

@@ -5,22 +5,12 @@ http://arxiv.org/abs/1511.07275
import numpy as np
from gym.envs.algorithmic import algorithmic_env
from gym.envs.algorithmic.algorithmic_env import ha
class ReverseEnv(algorithmic_env.AlgorithmicEnv):
class ReverseEnv(algorithmic_env.TapeAlgorithmicEnv):
MIN_REWARD_SHORTFALL_FOR_PROMOTION = -.1
def __init__(self, base=2):
algorithmic_env.AlgorithmicEnv.__init__(self,
inp_dim=1,
base=base,
chars=True)
algorithmic_env.AlgorithmicEnv.current_length = 1
super(ReverseEnv, self).__init__(base=base, chars=True, starting_min_length=1)
self.last = 50
def set_data(self):
self.content = {}
self.target = {}
for i in range(self.total_len):
val = self.np_random.randint(self.base)
self.content[ha(np.array([i]))] = val
self.target[self.total_len - i - 1] = val
self.total_reward = self.total_len + 0.9
def target_from_input_data(self, input_str):
return list(reversed(input_str))

View File

@@ -1,27 +1,30 @@
from __future__ import division
import numpy as np
from gym.envs.algorithmic import algorithmic_env
from gym.envs.algorithmic.algorithmic_env import ha
class ReversedAdditionEnv(algorithmic_env.AlgorithmicEnv):
class ReversedAdditionEnv(algorithmic_env.GridAlgorithmicEnv):
def __init__(self, rows=2, base=3):
self.rows = rows
algorithmic_env.AlgorithmicEnv.__init__(self,
inp_dim=2,
base=base,
chars=False)
def set_data(self):
self.content = {}
self.target = {}
super(ReversedAdditionEnv, self).__init__(rows=rows, base=base, chars=False)
def target_from_input_data(self, input_strings):
curry = 0
for i in range(self.total_len):
vals = []
for k in range(self.rows):
val = self.np_random.randint(self.base)
self.content[ha(np.array([i, k]))] = val
vals.append(val)
total = sum(vals) + curry
self.target[i] = total % self.base
curry = total / self.base
target = []
for digits in input_strings:
total = sum(digits) + curry
target.append(total % self.base)
curry = total // self.base
if curry > 0:
self.target[self.total_len] = curry
self.total_reward = self.total_len
target.append(curry)
return target
@property
def time_limit(self):
# Quirk preserved for the sake of consistency: add the length of the input
# rather than the length of the desired output (which may differ if there's
# an extra carried digit).
# TODO: It seems like this time limit is so strict as to make Addition3-v0
# unsolvable, since agents aren't even given enough time steps to look at
# all the digits. (The solutions on the scoreboard seem to only work by
# save-scumming.)
return self.input_width*2 + 4

View File

View File

@@ -0,0 +1,239 @@
from gym.envs import algorithmic as alg
import unittest
# All concrete subclasses of AlgorithmicEnv
ALL_ENVS = [
alg.copy_.CopyEnv,
alg.duplicated_input.DuplicatedInputEnv,
alg.repeat_copy.RepeatCopyEnv,
alg.reverse.ReverseEnv,
alg.reversed_addition.ReversedAdditionEnv,
]
ALL_TAPE_ENVS = [env for env in ALL_ENVS
if issubclass(env, alg.algorithmic_env.TapeAlgorithmicEnv)]
ALL_GRID_ENVS = [env for env in ALL_ENVS
if issubclass(env, alg.algorithmic_env.GridAlgorithmicEnv)]
def imprint(env, input_arr):
"""Monkey-patch the given environment so that when reset() is called, the
input tape/grid will be set to the given data, rather than being randomly
generated."""
env.generate_input_data = lambda _: input_arr
class TestAlgorithmicEnvInteractions(unittest.TestCase):
"""Test some generic behaviour not specific to any particular algorithmic
environment. Movement, allocation of rewards, etc."""
CANNED_INPUT = [0, 1]
ENV_KLS = alg.copy_.CopyEnv
LEFT, RIGHT = ENV_KLS._movement_idx('left'), ENV_KLS._movement_idx('right')
def setUp(self):
self.env = self.ENV_KLS(base=2, chars=True)
imprint(self.env, self.CANNED_INPUT)
def test_successful_interaction(self):
obs = self.env.reset()
self.assertEqual(obs, 0)
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 0])
self.assertEqual(obs, 1)
self.assertGreater(reward, 0)
self.assertFalse(done)
obs, reward, done, _ = self.env.step([self.LEFT, 1, 1])
self.assertTrue(done)
self.assertGreater(reward, 0)
def test_bad_output_fail_fast(self):
obs = self.env.reset()
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 1])
self.assertTrue(done)
self.assertLess(reward, 0)
def test_levelup(self):
obs = self.env.reset()
# Kind of a hack
alg.algorithmic_env.AlgorithmicEnv.reward_shortfalls = []
min_length = self.env.min_length
for i in range(self.env.last):
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 0])
self.assertFalse(done)
obs, reward, done, _ = self.env.step([self.RIGHT, 1, 1])
self.assertTrue(done)
self.env.reset()
if i < self.env.last-1:
self.assertEqual(len(alg.algorithmic_env.AlgorithmicEnv.reward_shortfalls), i+1)
else:
# Should have leveled up on the last iteration
self.assertEqual(self.env.min_length, min_length+1)
self.assertEqual(len(alg.algorithmic_env.AlgorithmicEnv.reward_shortfalls), 0)
def test_walk_off_the_end(self):
obs = self.env.reset()
# Walk off the end
obs, r, done, _ = self.env.step([self.LEFT, 0, 0])
self.assertEqual(obs, self.env.base)
self.assertEqual(r, 0)
self.assertFalse(done)
# Walk further off track
obs, r, done, _ = self.env.step([self.LEFT, 0, 0])
self.assertEqual(obs, self.env.base)
self.assertFalse(done)
# Return to the first input character
obs, r, done, _ = self.env.step([self.RIGHT, 0, 0])
self.assertEqual(obs, self.env.base)
self.assertFalse(done)
obs, r, done, _ = self.env.step([self.RIGHT, 0, 0])
self.assertEqual(obs, 0)
def test_grid_naviation(self):
env = alg.reversed_addition.ReversedAdditionEnv(rows=2, base=6)
N,S,E,W = [env._movement_idx(named_dir) for named_dir in ['up', 'down', 'right', 'left']]
# Corresponds to a grid that looks like...
# 0 1 2
# 3 4 5
canned = [ [0, 3], [1, 4], [2, 5] ]
imprint(env, canned)
obs = env.reset()
self.assertEqual(obs, 0)
navigation = [
(S, 3), (N, 0), (E, 1), (S, 4), (S, 6), (E, 6), (N, 5), (N, 2), (W, 1)
]
for (movement, expected_obs) in navigation:
obs, reward, done, _ = env.step([movement, 0, 0])
self.assertEqual(reward, 0)
self.assertFalse(done)
self.assertEqual(obs, expected_obs)
def test_grid_success(self):
env = alg.reversed_addition.ReversedAdditionEnv(rows=2, base=3)
canned = [ [1, 2], [1, 0], [2, 2] ]
imprint(env, canned)
obs = env.reset()
target = [0, 2, 1, 1]
self.assertEqual(env.target, target)
self.assertEqual(obs, 1)
for i, target_digit in enumerate(target):
obs, reward, done, _ = env.step([0, 1, target_digit])
self.assertGreater(reward, 0)
self.assertEqual(done, i==len(target)-1)
def test_sane_time_limit(self):
obs = self.env.reset()
self.assertLess(self.env.time_limit, 100)
for _ in range(100):
obs, r, done, _ = self.env.step([self.LEFT, 0, 0])
if done:
return
self.fail("Time limit wasn't enforced")
def test_rendering(self):
env = self.env
obs = env.reset()
self.assertEqual(env._get_str_obs(), 'A')
self.assertEqual(env._get_str_obs(1), 'B')
self.assertEqual(env._get_str_obs(-1), ' ')
self.assertEqual(env._get_str_obs(2), ' ')
self.assertEqual(env._get_str_target(0), 'A')
self.assertEqual(env._get_str_target(1), 'B')
# Test numerical alphabet rendering
env = self.ENV_KLS(base=3, chars=False)
imprint(env, self.CANNED_INPUT)
env.reset()
self.assertEqual(env._get_str_obs(), '0')
self.assertEqual(env._get_str_obs(1), '1')
class TestTargets(unittest.TestCase):
"""Test the rules mapping input strings/grids to target outputs."""
def test_reverse_target(self):
input_expected = [
([0], [0]),
([0, 1], [1, 0]),
([1, 1], [1, 1]),
([1, 0, 1], [1, 0, 1]),
([0, 0, 1, 1], [1, 1, 0, 0]),
]
env = alg.reverse.ReverseEnv()
for input_arr, expected in input_expected:
target = env.target_from_input_data(input_arr)
self.assertEqual(target, expected)
def test_reversed_addition_target(self):
env = alg.reversed_addition.ReversedAdditionEnv(base=3)
input_expected = [
([[1,1], [1,1]], [2, 2]),
([[2,2], [0,1]], [1, 2]),
([[2,1], [1,1], [1,1], [1,0]], [0, 0, 0, 2]),
]
for (input_grid, expected_target) in input_expected:
self.assertEqual(env.target_from_input_data(input_grid), expected_target)
def test_reversed_addition_3rows(self):
env = alg.reversed_addition.ReversedAdditionEnv(base=3, rows=3)
input_expected = [
([[1,1,0],[0,1,1]], [2, 2]),
([[1,1,2],[0,1,1]], [1,0,1]),
]
for (input_grid, expected_target) in input_expected:
self.assertEqual(env.target_from_input_data(input_grid), expected_target)
def test_copy_target(self):
env = alg.copy_.CopyEnv()
self.assertEqual(env.target_from_input_data([0, 1, 2]), [0, 1, 2])
def test_duplicated_input_target(self):
env = alg.duplicated_input.DuplicatedInputEnv(duplication=2)
self.assertEqual(env.target_from_input_data([0, 0, 0, 0, 1, 1]), [0, 0, 1])
def test_repeat_copy_target(self):
env = alg.repeat_copy.RepeatCopyEnv()
self.assertEqual(env.target_from_input_data([0, 1, 2]), [0, 1, 2, 2, 1, 0, 0, 1, 2])
class TestInputGeneration(unittest.TestCase):
"""Test random input generation.
"""
def test_tape_inputs(self):
for env_kls in ALL_TAPE_ENVS:
env = env_kls()
for size in range(2,5):
input_tape = env.generate_input_data(size)
self.assertTrue(all(0<=x<=env.base for x in input_tape),
"Invalid input tape from env {}: {}".format(env_kls, input_tape))
# DuplicatedInput needs to generate inputs with even length,
# so it may be short one
self.assertLessEqual(len(input_tape), size)
def test_grid_inputs(self):
for env_kls in ALL_GRID_ENVS:
env = env_kls()
for size in range(2, 5):
input_grid = env.generate_input_data(size)
# Should get "size" sublists, each of length self.rows (not the
# opposite, as you might expect)
self.assertEqual(len(input_grid), size)
self.assertTrue(all(len(col) == env.rows for col in input_grid))
self.assertTrue(all(0<=x<=env.base for x in input_grid[0]))
def test_duplicatedinput_inputs(self):
"""The duplicated_input env needs to generate strings with the appropriate
amount of repetiion."""
env = alg.duplicated_input.DuplicatedInputEnv(duplication=2)
input_tape = env.generate_input_data(4)
self.assertEqual(len(input_tape), 4)
self.assertEqual(input_tape[0], input_tape[1])
self.assertEqual(input_tape[2], input_tape[3])
# If requested input size isn't a multiple of duplication, go lower
input_tape = env.generate_input_data(3)
self.assertEqual(len(input_tape), 2)
self.assertEqual(input_tape[0], input_tape[1])
# If requested input size is *less than* duplication, go up
input_tape = env.generate_input_data(1)
self.assertEqual(len(input_tape), 2)
self.assertEqual(input_tape[0], input_tape[1])
env = alg.duplicated_input.DuplicatedInputEnv(duplication=3)
input_tape = env.generate_input_data(6)
self.assertEqual(len(input_tape), 6)
self.assertEqual(input_tape[0], input_tape[1])
self.assertEqual(input_tape[1], input_tape[2])
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff