2018-02-02 23:01:45 -08:00
|
|
|
import gym
|
2017-09-05 08:49:43 -07:00
|
|
|
from collections import OrderedDict
|
|
|
|
|
2018-02-02 23:01:45 -08:00
|
|
|
class Dict(gym.Space):
|
2017-09-05 08:49:43 -07:00
|
|
|
"""
|
2017-11-05 21:16:46 +03:00
|
|
|
A dictionary of simpler spaces.
|
2017-09-05 08:49:43 -07:00
|
|
|
|
|
|
|
Example usage:
|
|
|
|
self.observation_space = spaces.Dict({"position": spaces.Discrete(2), "velocity": spaces.Discrete(3)})
|
2017-11-05 21:16:46 +03:00
|
|
|
|
|
|
|
Example usage [nested]:
|
|
|
|
self.nested_observation_space = spaces.Dict({
|
|
|
|
'sensors': spaces.Dict({
|
2018-08-13 11:53:23 -07:00
|
|
|
'position': spaces.Box(low=-100, high=100, shape=(3,)),
|
|
|
|
'velocity': spaces.Box(low=-1, high=1, shape=(3,)),
|
2017-11-05 21:16:46 +03:00
|
|
|
'front_cam': spaces.Tuple((
|
|
|
|
spaces.Box(low=0, high=1, shape=(10, 10, 3)),
|
|
|
|
spaces.Box(low=0, high=1, shape=(10, 10, 3))
|
|
|
|
)),
|
|
|
|
'rear_cam': spaces.Box(low=0, high=1, shape=(10, 10, 3)),
|
|
|
|
}),
|
|
|
|
'ext_controller': spaces.MultiDiscrete([ [0,4], [0,1], [0,1] ]),
|
|
|
|
'inner_state':spaces.Dict({
|
|
|
|
'charge': spaces.Discrete(100),
|
|
|
|
'system_checks': spaces.MultiBinary(10),
|
|
|
|
'job_status': spaces.Dict({
|
|
|
|
'task': spaces.Discrete(5),
|
|
|
|
'progress': spaces.Box(low=0, high=100, shape=()),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
})
|
2017-09-05 08:49:43 -07:00
|
|
|
"""
|
|
|
|
def __init__(self, spaces):
|
2018-08-27 19:28:32 +02:00
|
|
|
if isinstance(spaces, dict) and not isinstance(spaces, OrderedDict):
|
2017-09-05 08:49:43 -07:00
|
|
|
spaces = OrderedDict(sorted(list(spaces.items())))
|
|
|
|
if isinstance(spaces, list):
|
2017-09-06 19:52:26 -07:00
|
|
|
spaces = OrderedDict(spaces)
|
2017-09-05 08:49:43 -07:00
|
|
|
self.spaces = spaces
|
2018-02-02 23:01:45 -08:00
|
|
|
gym.Space.__init__(self, None, None) # None for shape and dtype, since it'll require special handling
|
2017-09-05 08:49:43 -07:00
|
|
|
|
|
|
|
def sample(self):
|
|
|
|
return OrderedDict([(k, space.sample()) for k, space in self.spaces.items()])
|
|
|
|
|
|
|
|
def contains(self, x):
|
|
|
|
if not isinstance(x, dict) or len(x) != len(self.spaces):
|
|
|
|
return False
|
|
|
|
for k, space in self.spaces.items():
|
|
|
|
if k not in x:
|
|
|
|
return False
|
|
|
|
if not space.contains(x[k]):
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "Dict(" + ", ". join([k + ":" + str(s) for k, s in self.spaces.items()]) + ")"
|
|
|
|
|
|
|
|
def to_jsonable(self, sample_n):
|
|
|
|
# serialize as dict-repr of vectors
|
|
|
|
return {key: space.to_jsonable([sample[key] for sample in sample_n]) \
|
|
|
|
for key, space in self.spaces.items()}
|
|
|
|
|
|
|
|
def from_jsonable(self, sample_n):
|
|
|
|
dict_of_list = {}
|
|
|
|
for key, space in self.spaces.items():
|
|
|
|
dict_of_list[key] = space.from_jsonable(sample_n[key])
|
|
|
|
ret = []
|
|
|
|
for i, _ in enumerate(dict_of_list[key]):
|
|
|
|
entry = {}
|
|
|
|
for key, value in dict_of_list.items():
|
|
|
|
entry[key] = value[i]
|
|
|
|
ret.append(entry)
|
|
|
|
return ret
|
2018-02-08 12:53:47 -08:00
|
|
|
|