mirror of
https://github.com/Farama-Foundation/Gymnasium.git
synced 2025-08-02 14:26:33 +00:00
170 lines
5.4 KiB
Python
170 lines
5.4 KiB
Python
import sys
|
|
from contextlib import closing
|
|
|
|
import numpy as np
|
|
from io import StringIO
|
|
|
|
from gym import utils
|
|
from gym.envs.toy_text import discrete
|
|
|
|
LEFT = 0
|
|
DOWN = 1
|
|
RIGHT = 2
|
|
UP = 3
|
|
|
|
MAPS = {
|
|
"4x4": ["SFFF", "FHFH", "FFFH", "HFFG"],
|
|
"8x8": [
|
|
"SFFFFFFF",
|
|
"FFFFFFFF",
|
|
"FFFHFFFF",
|
|
"FFFFFHFF",
|
|
"FFFHFFFF",
|
|
"FHHFFFHF",
|
|
"FHFFHFHF",
|
|
"FFFHFFFG",
|
|
],
|
|
}
|
|
|
|
|
|
def generate_random_map(size=8, p=0.8):
|
|
"""Generates a random valid map (one that has a path from start to goal)
|
|
:param size: size of each side of the grid
|
|
:param p: probability that a tile is frozen
|
|
"""
|
|
valid = False
|
|
|
|
# DFS to check that it's a valid path.
|
|
def is_valid(res):
|
|
frontier, discovered = [], set()
|
|
frontier.append((0, 0))
|
|
while frontier:
|
|
r, c = frontier.pop()
|
|
if not (r, c) in discovered:
|
|
discovered.add((r, c))
|
|
directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
|
|
for x, y in directions:
|
|
r_new = r + x
|
|
c_new = c + y
|
|
if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:
|
|
continue
|
|
if res[r_new][c_new] == "G":
|
|
return True
|
|
if res[r_new][c_new] != "H":
|
|
frontier.append((r_new, c_new))
|
|
return False
|
|
|
|
while not valid:
|
|
p = min(1, p)
|
|
res = np.random.choice(["F", "H"], (size, size), p=[p, 1 - p])
|
|
res[0][0] = "S"
|
|
res[-1][-1] = "G"
|
|
valid = is_valid(res)
|
|
return ["".join(x) for x in res]
|
|
|
|
|
|
class FrozenLakeEnv(discrete.DiscreteEnv):
|
|
"""
|
|
Winter is here. You and your friends were tossing around a frisbee at the
|
|
park when you made a wild throw that left the frisbee out in the middle of
|
|
the lake. The water is mostly frozen, but there are a few holes where the
|
|
ice has melted. If you step into one of those holes, you'll fall into the
|
|
freezing water. At this time, there's an international frisbee shortage, so
|
|
it's absolutely imperative that you navigate across the lake and retrieve
|
|
the disc. However, the ice is slippery, so you won't always move in the
|
|
direction you intend.
|
|
The surface is described using a grid like the following
|
|
|
|
SFFF
|
|
FHFH
|
|
FFFH
|
|
HFFG
|
|
|
|
S : starting point, safe
|
|
F : frozen surface, safe
|
|
H : hole, fall to your doom
|
|
G : goal, where the frisbee is located
|
|
|
|
The episode ends when you reach the goal or fall in a hole.
|
|
You receive a reward of 1 if you reach the goal, and zero otherwise.
|
|
"""
|
|
|
|
metadata = {"render.modes": ["human", "ansi"]}
|
|
|
|
def __init__(self, desc=None, map_name="4x4", is_slippery=True):
|
|
if desc is None and map_name is None:
|
|
desc = generate_random_map()
|
|
elif desc is None:
|
|
desc = MAPS[map_name]
|
|
self.desc = desc = np.asarray(desc, dtype="c")
|
|
self.nrow, self.ncol = nrow, ncol = desc.shape
|
|
self.reward_range = (0, 1)
|
|
|
|
nA = 4
|
|
nS = nrow * ncol
|
|
|
|
isd = np.array(desc == b"S").astype("float64").ravel()
|
|
isd /= isd.sum()
|
|
|
|
P = {s: {a: [] for a in range(nA)} for s in range(nS)}
|
|
|
|
def to_s(row, col):
|
|
return row * ncol + col
|
|
|
|
def inc(row, col, a):
|
|
if a == LEFT:
|
|
col = max(col - 1, 0)
|
|
elif a == DOWN:
|
|
row = min(row + 1, nrow - 1)
|
|
elif a == RIGHT:
|
|
col = min(col + 1, ncol - 1)
|
|
elif a == UP:
|
|
row = max(row - 1, 0)
|
|
return (row, col)
|
|
|
|
def update_probability_matrix(row, col, action):
|
|
newrow, newcol = inc(row, col, action)
|
|
newstate = to_s(newrow, newcol)
|
|
newletter = desc[newrow, newcol]
|
|
done = bytes(newletter) in b"GH"
|
|
reward = float(newletter == b"G")
|
|
return newstate, reward, done
|
|
|
|
for row in range(nrow):
|
|
for col in range(ncol):
|
|
s = to_s(row, col)
|
|
for a in range(4):
|
|
li = P[s][a]
|
|
letter = desc[row, col]
|
|
if letter in b"GH":
|
|
li.append((1.0, s, 0, True))
|
|
else:
|
|
if is_slippery:
|
|
for b in [(a - 1) % 4, a, (a + 1) % 4]:
|
|
li.append(
|
|
(1.0 / 3.0, *update_probability_matrix(row, col, b))
|
|
)
|
|
else:
|
|
li.append((1.0, *update_probability_matrix(row, col, a)))
|
|
|
|
super(FrozenLakeEnv, self).__init__(nS, nA, P, isd)
|
|
|
|
def render(self, mode="human"):
|
|
outfile = StringIO() if mode == "ansi" else sys.stdout
|
|
|
|
row, col = self.s // self.ncol, self.s % self.ncol
|
|
desc = self.desc.tolist()
|
|
desc = [[c.decode("utf-8") for c in line] for line in desc]
|
|
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
|
|
if self.lastaction is not None:
|
|
outfile.write(
|
|
" ({})\n".format(["Left", "Down", "Right", "Up"][self.lastaction])
|
|
)
|
|
else:
|
|
outfile.write("\n")
|
|
outfile.write("\n".join("".join(line) for line in desc) + "\n")
|
|
|
|
if mode != "human":
|
|
with closing(outfile):
|
|
return outfile.getvalue()
|