This repository has been archived by the owner on Feb 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjumping_task.py
378 lines (330 loc) · 15.1 KB
/
jumping_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#!/usr/bin/env python
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root for full license information.
import argparse
import numpy as np
import pygame
import time
################## COLORS #####################
# Colors of the different objects on the screen
RGB_WHITE = (255, 255, 255)
RGB_GREY = (128, 128, 128)
RGB_BLACK = (0, 0, 0)
GREYSCALE_WHITE = 1.0
GREYSCALE_GREY = 0.5
###############################################
############### JUMP PARAMETERS ###############
# The jump shape is a `hat`:
# - diagonal going up until the jump height
# - then diagonal going down
JUMP_HEIGHT = 15
JUMP_VERTICAL_SPEED = 1
JUMP_HORIZONTAL_SPEED = 1
###############################################
############ OBSTACLE POSITIONS ###############
# OBSTACLE_MIN_POSITION: defines the minimum position on the screen
# where the obstacle can be placed.
# OBSTACLE_*: fixed x positions of two obstacles on the floor.
# Constrained by the shape of the jump.
# This is used as a form of ultimate generalization test.
# Used when two_obstacles is set to True in the environment
OBSTACLE_MIN_POSITION = 15
OBSTACLE_1 = 20
OBSTACLE_2 = 55
###############################################
class JumpTaskEnv(object):
"""Environment for the jumping task.
Args:
scr_w: screen width, by default 60 pixels
scr_h: screen height, by default 60 pixels
floor_height: the height of the floor in pixels, by default 10 pixels
agent_w: agent width, by default 5 pixels
agent_h: agent height, by default 10 pixels
agent_init_pos: initial x position of the agent (on the floor), defaults to the left of the screen
agent_speed: agent lateral speed, measured in pixels per time step, by default 1 pixel
obstacle_position: initial x position of the obstacle (on the floor), by default 0 pixels, which is the leftmost one
obstacle_size: width and height of the obstacle, by default (9, 10)
rendering: display the game screen, by default False
zoom: zoom applied to the screen when rendering, by default 8
slow_motion: if True, sleeps for 0.1 seconds at each time step.
Allows to watch the game at "human" speed when played by the agent, by default False
with_left_action: if True, the left action is allowed, by default False
max_number_of_steps: the maximum number of steps for an episode, by default 600.
two_obstacles: puts two obstacles on the floor at a given location.
The ultimate generalization test, by default False
finish_jump: perform a full jump when the jump action is selected.
Otherwise an action needs to be selected as usual, by default False
"""
def __init__(self, scr_w=60, scr_h=60, floor_height=10,
agent_w=5, agent_h=10, agent_init_pos=0, agent_speed=1,
obstacle_position=30, obstacle_size=(9, 10),
rendering=False, zoom=8, slow_motion=False, with_left_action=False,
max_number_of_steps=600, two_obstacles=False, finish_jump=False):
self.rewards = {'life': -1, 'exit': 1}
self.scr_w = scr_w
self.scr_h = scr_h
self.state_shape = [scr_w, scr_h]
self.rendering = rendering
self.zoom = zoom
if rendering:
self.screen = pygame.display.set_mode((zoom*scr_w, zoom*scr_h))
if with_left_action:
self.legal_actions = [0, 1, 2]
else:
self.legal_actions = [0, 1]
self.nb_actions = len(self.legal_actions)
self.agent_speed = agent_speed
self.agent_current_speed = 0
self.jumping = [False, None]
self.agent_init_pos = agent_init_pos
self.agent_size = [agent_w, agent_h]
self.obstacle_size = obstacle_size
self.step_id = 0
self.slow_motion = slow_motion
self.max_number_of_steps = max_number_of_steps
self.finish_jump = finish_jump
self.reset(obstacle_position, floor_height, two_obstacles)
# Define gym env objects
try:
import gym
from gym import spaces
self.observation_space = spaces.Box(low=0, high=1, shape=(self.state_shape))
self.action_space = spaces.Discrete(self.nb_actions)
except:
# Fail silently as this is only for compatibility with codebases that expect a gym env
pass
def _game_status(self):
''' Returns two booleans stating whether the agent is touching the obstacle(s) (failure)
and whether the agent has reached the right end of the screen (success).
'''
def _overlapping_objects(env, sx, sy):
return sx + env.obstacle_size[0] > env.agent_pos_x and sx < env.agent_pos_x + env.agent_size[0] \
and sy + env.obstacle_size[1] > env.agent_pos_y and sy < env.agent_pos_y + env.agent_size[1]
if self.two_obstacles:
failure = _overlapping_objects(self, OBSTACLE_1, self.floor_height) or \
_overlapping_objects(self, OBSTACLE_2, self.floor_height)
else:
failure = _overlapping_objects(
self, self.obstacle_position, self.floor_height)
success = self.scr_w < self.agent_pos_x + self.agent_size[0]
self.done = failure or success
if self.rendering:
self.render()
if self.slow_motion:
time.sleep(0.1)
return failure, success
def _continue_jump(self):
''' Updates the position of the agent while jumping.
Needs to be called at each discrete step of the jump
'''
self.agent_pos_x = np.max([self.agent_pos_x + self.agent_current_speed, 0])
if self.agent_pos_y > self.floor_height + JUMP_HEIGHT:
self.jumping[1] = "down"
if self.jumping[1] == "up":
self.agent_pos_y += self.agent_speed * JUMP_VERTICAL_SPEED
elif self.jumping[1] == "down":
self.agent_pos_y -= self.agent_speed * JUMP_VERTICAL_SPEED
if self.agent_pos_y == self.floor_height:
self.jumping[0] = False
def reset(self, obstacle_position=30, floor_height=10, two_obstacles=False):
''' Resets the game.
To be called at the beginning of each episode.
Allows to set different obstacle positions and floor heights
Args:
obstacle_position: the x position of the obstacle for the new game
floor_height: the floor height for the new game
two_obstacles: whether to switch to a two obstacles environment
'''
self.floor_height = floor_height
self.agent_pos_x = self.agent_init_pos
self.agent_pos_y = self.floor_height
self.agent_current_speed = 0
self.jumping = [False, None]
self.step_id = 0
self.done = False
self.two_obstacles = two_obstacles
if not two_obstacles:
self.obstacle_position = obstacle_position + OBSTACLE_MIN_POSITION
return self.get_state()
def close(self):
''' Exits the game and closes the rendering.
'''
self.done = True
if self.rendering:
pygame.quit()
def seed(self, seed=None):
''' Deterministic environment
'''
return [seed]
def get_state(self):
''' Returns an np array of the screen in greyscale
'''
obs = np.zeros((self.scr_h, self.scr_w), dtype=np.float32)
def _fill_rec(left, up, size, color):
obs[left: left + size[0], up: up + size[1]] = color
# Add agent and obstacles
_fill_rec(self.agent_pos_x, self.agent_pos_y, self.agent_size, 1.0)
if self.two_obstacles:
# Multiple obstacles
_fill_rec(OBSTACLE_1, self.floor_height,
self.obstacle_size, GREYSCALE_GREY)
_fill_rec(OBSTACLE_2, self.floor_height,
self.obstacle_size, GREYSCALE_GREY)
else:
_fill_rec(self.obstacle_position, self.floor_height,
self.obstacle_size, GREYSCALE_GREY)
# Draw the outline of the screen
obs[0:self.scr_w, 0] = GREYSCALE_WHITE
obs[0:self.scr_w, self.scr_h-1] = GREYSCALE_WHITE
obs[0, 0:self.scr_h] = GREYSCALE_WHITE
obs[self.scr_w-1, 0:self.scr_h] = GREYSCALE_WHITE
# Draw the floor
obs[0:self.scr_w, self.floor_height] = GREYSCALE_WHITE
return obs.T
def step(self, action):
''' Updates the game state based on the action selected.
Returns the state as a greyscale numpy array, the reward obtained by the agent
and a boolean stating whether the next state is terminal.
The reward is defined as a +1 for each pixel movement to the right.
Args
action: the action to be taken by the agent
'''
reward = -self.agent_pos_x
if self.step_id > self.max_number_of_steps:
print('You have reached the maximum number of steps.')
self.done = True
return self.get_state(), 0., self.done, {}
elif action not in self.legal_actions:
raise ValueError(
'We did not recognize that action. '
'It should be an int in {}'.format(self.legal_actions))
if self.jumping[0]:
self._continue_jump()
elif action == 0: # right
self.agent_pos_x += self.agent_speed
self.agent_current_speed = self.agent_speed * JUMP_HORIZONTAL_SPEED
elif action == 1: # jump
self.jumping = [True, "up"]
self._continue_jump()
elif action == 2: # left, can only be taken if self.with_left_action is set to True
if self.agent_pos_x > 0:
self.agent_pos_x -= self.agent_speed
self.agent_current_speed = -self.agent_speed * JUMP_HORIZONTAL_SPEED
else:
self.agent_current_speed = 0
killed, exited = self._game_status()
if self.finish_jump:
# Continue jumping until jump is finished
# Being in the air is marked by self.jumping[0]
while self.jumping[0] and not killed and not exited:
self._continue_jump()
killed, exited = self._game_status()
reward += self.agent_pos_x
if killed:
reward = self.rewards['life']
elif exited:
reward += self.rewards['exit']
self.step_id += 1
return self.get_state(), reward, self.done, {}
def render(self):
''' Render the screen game using pygame.
'''
if not self.rendering:
return
pygame.event.pump()
self.screen.fill(RGB_BLACK)
pygame.draw.line(self.screen, RGB_WHITE,
[0, self.zoom*(self.scr_h-self.floor_height)],
[self.zoom*self.scr_w, self.zoom*(self.scr_h-self.floor_height)], 1)
agent = pygame.Rect(self.zoom*self.agent_pos_x,
self.zoom*(self.scr_h-self.agent_pos_y-self.agent_size[1]),
self.zoom*self.agent_size[0],
self.zoom*self.agent_size[1])
pygame.draw.rect(self.screen, RGB_WHITE, agent)
if self.two_obstacles:
obstacle = pygame.Rect(self.zoom*OBSTACLE_1,
self.zoom*(self.scr_h-self.floor_height-self.obstacle_size[1]),
self.zoom*self.obstacle_size[0],
self.zoom*self.obstacle_size[1])
pygame.draw.rect(self.screen, RGB_GREY, obstacle)
obstacle = pygame.Rect(self.zoom*OBSTACLE_2,
self.zoom*(self.scr_h-self.floor_height-self.obstacle_size[1]),
self.zoom*self.obstacle_size[0],
self.zoom*self.obstacle_size[1])
else:
obstacle = pygame.Rect(self.zoom*self.obstacle_position,
self.zoom*(self.scr_h-self.obstacle_size[1]-self.floor_height),
self.zoom*self.obstacle_size[0],
self.zoom*self.obstacle_size[1])
pygame.draw.rect(self.screen, RGB_GREY, obstacle)
pygame.display.flip()
def test(args):
env = JumpTaskEnv(scr_w=args.scr_w, scr_h=args.scr_h, floor_height=args.floor_height,
agent_w=args.agent_w, agent_h=args.agent_h, agent_init_pos=args.agent_init_pos, agent_speed=args.agent_speed,
obstacle_position=args.obstacle_position, obstacle_size=args.obstacle_size,
rendering=True, zoom=args.zoom, slow_motion=True, with_left_action=args.with_left_action,
max_number_of_steps=args.max_number_of_steps, two_obstacles=args.two_obstacles, finish_jump=args.finish_jump)
env.render()
score = 0
while not env.done:
action = None
if env.jumping[0] and env.finish_jump:
action = 3
else:
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_RIGHT:
action = 0
elif event.key == pygame.K_UP:
action = 1
elif event.key == pygame.K_LEFT and args.with_left_action:
action = 2
elif event.key == pygame.K_e:
env.exit()
else:
action = 'unknown'
if action is None:
continue
elif action == 'unknown':
print('We did not recognize that action. Please use the arrows to move the agent or the \'e\' key to exit.')
continue
_, r, term, _ = env.step(action)
env.render()
score += r
print('Agent position: {:2d} | Reward: {:2d} | Terminal: {}'.format(env.agent_pos_x, r, term))
print('---------------')
print('Final score: {:2d}'.format(int(score)))
print('---------------')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Options to test the environment")
parser.add_argument('--scr_w', type=int, default=60,
help='screen width, by default 60 pixels')
parser.add_argument('--scr_h', type=int, default=60,
help='screen height, by default 60 pixels')
parser.add_argument('--floor_height', type=int, default=10,
help='the y position of the floor in pixels, by default 10 pixels')
parser.add_argument('--agent_w', type=int, default=5,
help='agent width, by default 5 pixels')
parser.add_argument('--agent_h', type=int, default=10,
help='agent height, by default 10 pixels')
parser.add_argument('--agent_init_pos', type=int, default=0,
help='initial x position of the agent(on the floor), defaults to the left of the screen')
parser.add_argument('--agent_speed', type=int, default=1,
help='agent lateral speed, measured in pixels per time step, by default 1 pixel')
parser.add_argument('--obstacle_position', type=int, default=0,
help='initial x position of the obstacle (on the floor), by default 0 pixels, which is the leftmost one')
parser.add_argument('--obstacle_size', type=int, default=(9,10),
help='width and height of the obstacle, by default(9, 10)')
parser.add_argument('--zoom', type=int, default=8,
help='zoom applied to the screen when rendering, by default 8')
parser.add_argument('--with_left_action', action='store_true',
help='flag, if present, the left action is allowed, by default False')
parser.add_argument('--max_number_of_steps', type=int, default=600,
help='the maximum number of steps for an episode, by default 600.')
parser.add_argument('--two_obstacles', action='store_true', help='flag, if present: puts two obstacles on the floor at a given location. ' +
'The ultimate generalization test, by default False')
parser.add_argument('--finish_jump', action='store_true', help='flag, if present: perform a full jump when the jump action is selected. ' +
'Otherwise an action needs to be selected as usual, by default False')
args = parser.parse_args()
test(args)