Remove DiscreteEnv class (#2514)

This commit is contained in:
Carlos Luis
2021-12-22 19:25:36 +01:00
committed by GitHub
parent 4fe7efaacd
commit 102cd1bf4c
5 changed files with 95 additions and 93 deletions

View File

@@ -1,8 +1,11 @@
import numpy as np
import sys
from contextlib import closing
from io import StringIO
from gym.envs.toy_text import discrete
from typing import Optional
import numpy as np
from gym import Env, spaces
from gym.envs.toy_text.utils import categorical_sample
UP = 0
RIGHT = 1
@@ -10,7 +13,7 @@ DOWN = 2
LEFT = 3
class CliffWalkingEnv(discrete.DiscreteEnv):
class CliffWalkingEnv(Env):
"""
This is a simple implementation of the Gridworld Cliff
reinforcement learning task.
@@ -37,29 +40,30 @@ class CliffWalkingEnv(discrete.DiscreteEnv):
self.shape = (4, 12)
self.start_state_index = np.ravel_multi_index((3, 0), self.shape)
nS = np.prod(self.shape)
nA = 4
self.nS = np.prod(self.shape)
self.nA = 4
# Cliff Location
self._cliff = np.zeros(self.shape, dtype=np.bool)
self._cliff[3, 1:-1] = True
# Calculate transition probabilities and rewards
P = {}
for s in range(nS):
self.P = {}
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
P[s] = {a: [] for a in range(nA)}
P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
self.P[s] = {a: [] for a in range(self.nA)}
self.P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
self.P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
self.P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
self.P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
# Calculate initial state distribution
# We always start in state (3, 0)
isd = np.zeros(nS)
isd[self.start_state_index] = 1.0
self.initial_state_distrib = np.zeros(self.nS)
self.initial_state_distrib[self.start_state_index] = 1.0
super().__init__(nS, nA, P, isd)
self.observation_space = spaces.Discrete(self.nS)
self.action_space = spaces.Discrete(self.nA)
def _limit_coordinates(self, coord):
"""
@@ -90,6 +94,20 @@ class CliffWalkingEnv(discrete.DiscreteEnv):
is_done = tuple(new_position) == terminal_state
return [(1.0, new_state, -1, is_done)]
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
p, s, r, d = transitions[i]
self.s = s
self.lastaction = a
return (int(s), r, d, {"prob": p})
def reset(self, seed: Optional[int] = None):
super().reset(seed=seed)
self.s = categorical_sample(self.initial_state_distrib, self.np_random)
self.lastaction = None
return int(self.s)
def render(self, mode="human"):
outfile = StringIO() if mode == "ansi" else sys.stdout

View File

@@ -1,57 +0,0 @@
from typing import Optional
import numpy as np
from gym import Env, spaces
from gym.utils import seeding
def categorical_sample(prob_n, np_random):
"""
Sample from categorical distribution
Each row specifies class probabilities
"""
prob_n = np.asarray(prob_n)
csprob_n = np.cumsum(prob_n)
return (csprob_n > np_random.random()).argmax()
class DiscreteEnv(Env):
"""
Has the following members
- nS: number of states
- nA: number of actions
- P: transitions (*)
- isd: initial state distribution (**)
(*) dictionary of lists, where
P[s][a] == [(probability, nextstate, reward, done), ...]
(**) list or array of length nS
"""
def __init__(self, nS, nA, P, isd):
self.P = P
self.isd = isd
self.lastaction = None # for rendering
self.nS = nS
self.nA = nA
self.action_space = spaces.Discrete(self.nA)
self.observation_space = spaces.Discrete(self.nS)
def reset(self, seed: Optional[int] = None):
super().reset(seed=seed)
self.s = categorical_sample(self.isd, self.np_random)
self.lastaction = None
return int(self.s)
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
p, s, r, d = transitions[i]
self.s = s
self.lastaction = a
return (int(s), r, d, {"prob": p})

View File

@@ -1,11 +1,11 @@
import sys
from contextlib import closing
from io import StringIO
from typing import Optional
import numpy as np
from io import StringIO
from gym import utils
from gym.envs.toy_text import discrete
from gym import Env, spaces, utils
from gym.envs.toy_text.utils import categorical_sample
LEFT = 0
DOWN = 1
@@ -63,7 +63,7 @@ def generate_random_map(size=8, p=0.8):
return ["".join(x) for x in res]
class FrozenLakeEnv(discrete.DiscreteEnv):
class FrozenLakeEnv(Env):
"""
Winter is here. You and your friends were tossing around a frisbee at the
park when you made a wild throw that left the frisbee out in the middle of
@@ -103,10 +103,10 @@ class FrozenLakeEnv(discrete.DiscreteEnv):
nA = 4
nS = nrow * ncol
isd = np.array(desc == b"S").astype("float64").ravel()
isd /= isd.sum()
self.initial_state_distrib = np.array(desc == b"S").astype("float64").ravel()
self.initial_state_distrib /= self.initial_state_distrib.sum()
P = {s: {a: [] for a in range(nA)} for s in range(nS)}
self.P = {s: {a: [] for a in range(nA)} for s in range(nS)}
def to_s(row, col):
return row * ncol + col
@@ -134,7 +134,7 @@ class FrozenLakeEnv(discrete.DiscreteEnv):
for col in range(ncol):
s = to_s(row, col)
for a in range(4):
li = P[s][a]
li = self.P[s][a]
letter = desc[row, col]
if letter in b"GH":
li.append((1.0, s, 0, True))
@@ -147,7 +147,22 @@ class FrozenLakeEnv(discrete.DiscreteEnv):
else:
li.append((1.0, *update_probability_matrix(row, col, a)))
super().__init__(nS, nA, P, isd)
self.observation_space = spaces.Discrete(nS)
self.action_space = spaces.Discrete(nA)
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
p, s, r, d = transitions[i]
self.s = s
self.lastaction = a
return (int(s), r, d, {"prob": p})
def reset(self, seed: Optional[int] = None):
super().reset(seed=seed)
self.s = categorical_sample(self.initial_state_distrib, self.np_random)
self.lastaction = None
return int(self.s)
def render(self, mode="human"):
outfile = StringIO() if mode == "ansi" else sys.stdout

View File

@@ -1,9 +1,11 @@
import sys
from contextlib import closing
from io import StringIO
from gym import utils
from gym.envs.toy_text import discrete
from typing import Optional
import numpy as np
from gym import Env, spaces, utils
from gym.envs.toy_text.utils import categorical_sample
MAP = [
"+---------+",
@@ -16,7 +18,7 @@ MAP = [
]
class TaxiEnv(discrete.DiscreteEnv):
class TaxiEnv(Env):
"""
The Taxi Problem
from "Hierarchical Reinforcement Learning with the MAXQ Value Function Decomposition"
@@ -81,9 +83,9 @@ class TaxiEnv(discrete.DiscreteEnv):
num_columns = 5
max_row = num_rows - 1
max_col = num_columns - 1
initial_state_distrib = np.zeros(num_states)
self.initial_state_distrib = np.zeros(num_states)
num_actions = 6
P = {
self.P = {
state: {action: [] for action in range(num_actions)}
for state in range(num_states)
}
@@ -93,7 +95,7 @@ class TaxiEnv(discrete.DiscreteEnv):
for dest_idx in range(len(locs)):
state = self.encode(row, col, pass_idx, dest_idx)
if pass_idx < 4 and pass_idx != dest_idx:
initial_state_distrib[state] += 1
self.initial_state_distrib[state] += 1
for action in range(num_actions):
# defaults
new_row, new_col, new_pass_idx = row, col, pass_idx
@@ -128,11 +130,10 @@ class TaxiEnv(discrete.DiscreteEnv):
new_state = self.encode(
new_row, new_col, new_pass_idx, dest_idx
)
P[state][action].append((1.0, new_state, reward, done))
initial_state_distrib /= initial_state_distrib.sum()
discrete.DiscreteEnv.__init__(
self, num_states, num_actions, P, initial_state_distrib
)
self.P[state][action].append((1.0, new_state, reward, done))
self.initial_state_distrib /= self.initial_state_distrib.sum()
self.action_space = spaces.Discrete(num_actions)
self.observation_space = spaces.Discrete(num_states)
def encode(self, taxi_row, taxi_col, pass_loc, dest_idx):
# (5) 5, 5, 4
@@ -157,6 +158,20 @@ class TaxiEnv(discrete.DiscreteEnv):
assert 0 <= i < 5
return reversed(out)
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
p, s, r, d = transitions[i]
self.s = s
self.lastaction = a
return (int(s), r, d, {"prob": p})
def reset(self, seed: Optional[int] = None):
super().reset(seed=seed)
self.s = categorical_sample(self.initial_state_distrib, self.np_random)
self.lastaction = None
return int(self.s)
def render(self, mode="human"):
outfile = StringIO() if mode == "ansi" else sys.stdout

View File

@@ -0,0 +1,11 @@
import numpy as np
def categorical_sample(prob_n, np_random):
"""
Sample from categorical distribution
Each row specifies class probabilities
"""
prob_n = np.asarray(prob_n)
csprob_n = np.cumsum(prob_n)
return (csprob_n > np_random.random()).argmax()