-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpoint_robot.py
178 lines (147 loc) · 5.82 KB
/
point_robot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import matplotlib.pyplot as plt
import numpy as np
import torch
from gym import spaces
from gym import Env
from torchkit import pytorch_utils as ptu
from matplotlib.patches import Rectangle
class PointEnv(Env):
"""
point robot on a 2-D plane with position control
tasks (aka goals) are positions on the plane
- tasks sampled from unit square
- reward is L2 distance
"""
def __init__(self,
max_episode_steps=20,
n_tasks=2,
modify_init_state_dist=True,
on_circle_init_state=True,
**kwargs):
self._max_episode_steps = max_episode_steps
self.step_count = 0
self.modify_init_state_dist = modify_init_state_dist
self.on_circle_init_state = on_circle_init_state
# np.random.seed(1337)
goals = [[np.random.uniform(-1., 1.), np.random.uniform(-1., 1.)] for _ in range(n_tasks)]
self.goals = goals
self.reset_task(0)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,))
self.action_space = spaces.Box(low=-0.1, high=0.1, shape=(2,))
def reset_task(self, idx):
''' reset goal AND reset the agent '''
if idx is not None:
self._goal = np.array(self.goals[idx])
self.reset()
def set_goal(self, goal):
self._goal = np.asarray(goal)
def get_all_task_idx(self):
return range(len(self.goals))
def reset_model(self):
# reset to a random location on the unit square
self._state = np.random.uniform(-1., 1., size=(2,))
return self._get_obs()
def reset(self):
self.step_count = 0
return self.reset_model()
def _get_obs(self):
return np.copy(self._state)
def step(self, action):
self._state = self._state + action
reward = - ((self._state[0] - self._goal[0]) ** 2 + (self._state[1] - self._goal[1]) ** 2) ** 0.5
# check if maximum step limit is reached
self.step_count += 1
if self.step_count >= self._max_episode_steps:
done = True
else:
done = False
ob = self._get_obs()
return ob, reward, done, dict()
def reward(self, state, action=None):
return - ((state[0] - self._goal[0]) ** 2 + (state[1] - self._goal[1]) ** 2) ** 0.5
def viewer_setup(self):
print('no viewer')
pass
def render(self):
print('current state:', self._state)
class SparsePointEnv(PointEnv):
'''
- tasks sampled from unit half-circle
- reward is L2 distance given only within goal radius
NOTE that `step()` returns the dense reward because this is used during meta-training
the algorithm should call `sparsify_rewards()` to get the sparse rewards
'''
def __init__(self,
max_episode_steps=20,
n_tasks=2,
goal_radius=0.2,
modify_init_state_dist=True,
on_circle_init_state=True,
**kwargs):
super().__init__(max_episode_steps, n_tasks)
self.goal_radius = goal_radius
self.modify_init_state_dist = modify_init_state_dist
self.on_circle_init_state = on_circle_init_state
# np.random.seed(1337)
radius = 1.0
angles = np.random.uniform(0, np.pi, size=n_tasks)
xs = radius * np.cos(angles)
ys = radius * np.sin(angles)
goals = np.stack([xs, ys], axis=1)
np.random.shuffle(goals)
goals = goals.tolist()
self.goals = goals
self.reset_task(0)
def sparsify_rewards(self, r):
''' zero out rewards when outside the goal radius '''
mask = (r >= -self.goal_radius).astype(np.float32)
r = r * mask
return r
def reset_model(self):
self.step_count = 0
if self.modify_init_state_dist:
self._state = np.array([np.random.uniform(-1.5, 1.5), np.random.uniform(-0.5, 1.5)])
if not self.on_circle_init_state: # make sure initial state is not on semi-circle
while 1 - self.goal_radius <= np.linalg.norm(self._state) <= 1 + self.goal_radius:
self._state = np.array([np.random.uniform(-1.5, 1.5), np.random.uniform(-0.5, 1.5)])
else:
self._state = np.array([0, 0])
return self._get_obs()
def step(self, action):
ob, reward, done, d = super().step(action)
sparse_reward = self.sparsify_rewards(reward)
# make sparse rewards positive
if reward >= -self.goal_radius:
# sparse_reward += 1
sparse_reward = 1
d.update({'sparse_reward': sparse_reward})
# return ob, reward, done, d
return ob, sparse_reward, done, d
def reward(self, state, action=None):
return self.sparsify_rewards(super().reward(state, action))
def is_goal_state(self):
if np.linalg.norm(self._state - self._goal) <= self.goal_radius:
return True
else:
return False
def plot_env(self):
ax = plt.gca()
# plot half circle and goal position
angles = np.linspace(0, np.pi, num=100)
x, y = np.cos(angles), np.sin(angles)
plt.plot(x, y, color='k')
# fix visualization
plt.axis('scaled')
# ax.set_xlim(-1.25, 1.25)
ax.set_xlim(-2, 2)
# ax.set_ylim(-0.25, 1.25)
ax.set_ylim(-1, 2)
plt.xticks([])
plt.yticks([])
circle = plt.Circle((self._goal[0], self._goal[1]), radius=self.goal_radius, alpha=0.3)
ax.add_artist(circle)
def plot_behavior(self, observations, plot_env=True, **kwargs):
if plot_env: # whether to plot circle and goal pos..(maybe already exists)
self.plot_env()
# visualise behaviour, current position, goal
plt.plot(observations[:, 0], observations[:, 1], **kwargs)