27
27
28
28
import copy as copy
29
29
import logging
30
- import time
31
30
from typing import TYPE_CHECKING
32
31
33
32
import gymnasium
34
33
import numpy as np
34
+ from crazyflow import Sim
35
35
from gymnasium import spaces
36
36
from scipy .spatial .transform import Rotation as R
37
37
38
- from lsy_drone_racing .control . closing_controller import ClosingController
38
+ from lsy_drone_racing .sim . noise import NoiseList
39
39
from lsy_drone_racing .sim .physics import PhysicsMode
40
- from lsy_drone_racing .sim .sim import Sim
41
40
from lsy_drone_racing .utils import check_gate_pass
42
41
43
42
if TYPE_CHECKING :
@@ -81,8 +80,6 @@ class DroneRacingEnv(gymnasium.Env):
81
80
low-level controller.
82
81
"""
83
82
84
- CONTROLLER = "mellinger" # specifies controller type
85
-
86
83
def __init__ (self , config : dict ):
87
84
"""Initialize the DroneRacingEnv.
88
85
@@ -92,18 +89,20 @@ def __init__(self, config: dict):
92
89
super ().__init__ ()
93
90
self .config = config
94
91
self .sim = Sim (
95
- track = config .env .track ,
96
- sim_freq = config .sim .sim_freq ,
97
- ctrl_freq = config .sim .ctrl_freq ,
98
- disturbances = getattr (config .sim , "disturbances" , {}),
99
- randomization = getattr (config .env , "randomization" , {}),
100
- gui = config .sim .gui ,
92
+ n_worlds = 1 ,
101
93
n_drones = 1 ,
102
94
physics = config .sim .physics ,
95
+ control = "state" ,
96
+ freq = config .sim .sim_freq ,
97
+ state_freq = config .env .freq ,
98
+ attitude_freq = config .sim .attitude_freq ,
99
+ rng_key = config .env .seed ,
103
100
)
104
- self .sim .seed (config .env .seed )
101
+ self .contact_mask = np .array ([0 ], dtype = bool )
102
+ if config .sim .sim_freq % config .env .freq != 0 :
103
+ raise ValueError (f"({ config .sim .sim_freq = } ) is no multiple of ({ config .env .freq = } )" )
105
104
self .action_space = spaces .Box (low = - 1 , high = 1 , shape = (13 ,))
106
- n_gates , n_obstacles = len (self . sim . gates ), len (self . sim .obstacles )
105
+ n_gates , n_obstacles = len (config . env . track . gates ), len (config . env . track .obstacles )
107
106
self .observation_space = spaces .Dict (
108
107
{
109
108
"pos" : spaces .Box (low = - np .inf , high = np .inf , shape = (3 ,)),
@@ -120,10 +119,22 @@ def __init__(self, config: dict):
120
119
),
121
120
}
122
121
)
122
+ rpy_max = np .array ([85 / 180 * np .pi , 85 / 180 * np .pi , np .pi ], np .float32 ) # Yaw unbounded
123
+ pos_low , pos_high = np .array ([- 3 , - 3 , 0 ]), np .array ([3 , 3 , 2.5 ])
124
+ self .state_space = spaces .Dict (
125
+ {
126
+ "pos" : spaces .Box (low = pos_low , high = pos_high , dtype = np .float64 ),
127
+ "rpy" : spaces .Box (low = - rpy_max , high = rpy_max , dtype = np .float64 ),
128
+ "vel" : spaces .Box (low = - np .inf , high = np .inf , shape = (3 ,), dtype = np .float64 ),
129
+ "ang_vel" : spaces .Box (low = - np .inf , high = np .inf , shape = (3 ,), dtype = np .float64 ),
130
+ }
131
+ )
123
132
self .target_gate = 0
124
133
self .symbolic = self .sim .symbolic () if config .env .symbolic else None
125
134
self ._steps = 0
126
135
self ._last_drone_pos = np .zeros (3 )
136
+ self .gates , self .obstacles , self .drone = self .load_track (config .env .track )
137
+ self .disturbances = self .load_disturbances (config .env .get ("disturbances" , None ))
127
138
128
139
self .gates_visited = np .array ([False ] * len (config .env .track .gates ))
129
140
self .obstacles_visited = np .array ([False ] * len (config .env .track .obstacles ))
@@ -147,16 +158,20 @@ def reset(
147
158
if seed is not None :
148
159
self .sim .seed (seed )
149
160
self .sim .reset ()
161
+ states = self .sim .data .states .replace (
162
+ pos = self .drone ["pos" ].reshape ((1 , 1 , 3 )),
163
+ quat = self .drone ["quat" ].reshape ((1 , 1 , 4 )),
164
+ vel = self .drone ["vel" ].reshape ((1 , 1 , 3 )),
165
+ rpy_rates = self .drone ["rpy_rates" ].reshape ((1 , 1 , 3 )),
166
+ )
167
+ self .sim .data = self .sim .data .replace (states = states )
150
168
self .target_gate = 0
151
169
self ._steps = 0
152
- self .sim .drone .reset (self .sim .drone .pos , self .sim .drone .rpy , self .sim .drone .vel )
153
- self ._last_drone_pos [:] = self .sim .drone .pos
154
- if self .sim .n_drones > 1 :
155
- raise NotImplementedError ("Firmware wrapper does not support multiple drones." )
170
+ self ._last_drone_pos [:] = self .sim .data .states .pos [0 , 0 ]
156
171
info = self .info
157
- info ["sim_freq" ] = self .config . sim .sim_freq
158
- info ["low_level_ctrl_freq" ] = self .config . sim .ctrl_freq
159
- info ["drone_mass" ] = self .sim .drone . nominal_params .mass
172
+ info ["sim_freq" ] = self .sim .data . core . freq
173
+ info ["low_level_ctrl_freq" ] = self .sim .data . controls . attitude_freq
174
+ info ["drone_mass" ] = self .sim .default_data . params .mass [ 0 , 0 ]
160
175
info ["env_freq" ] = self .config .env .freq
161
176
return self .obs , info
162
177
@@ -177,83 +192,52 @@ def step(
177
192
), "sys_id model not supported for full state control interface"
178
193
action = action .astype (np .float64 ) # Drone firmware expects float64
179
194
assert action .shape == self .action_space .shape , f"Invalid action shape: { action .shape } "
180
- pos , vel , acc , yaw , rpy_rate = action [:3 ], action [3 :6 ], action [6 :9 ], action [9 ], action [10 :]
181
- self .sim .drone .full_state_cmd (pos , vel , acc , yaw , rpy_rate )
182
- collision = self ._inner_step_loop ()
183
- terminated = self .terminated or collision
184
- return self .obs , self .reward , terminated , False , self .info
185
-
186
- def _inner_step_loop (self ) -> bool :
187
- """Run the desired action for multiple simulation sub-steps.
195
+ self .sim .state_control (action .reshape ((1 , 1 , 13 )))
196
+ self .sim .step (self .sim .freq // self .sim .control_freq )
197
+ return self .obs , self .reward , self .terminated , False , self .info
188
198
189
- The outer controller is called at a lower frequency than the firmware loop. Each environment
190
- step therefore consists of multiple simulation steps. At each simulation step, the
191
- lower-level controller is called to compute the thrust and attitude commands.
192
-
193
- Returns:
194
- True if a collision occured at any point during the simulation steps, else False.
195
- """
196
- thrust = self .sim .drone .desired_thrust
197
- collision = False
198
- while (
199
- self .sim .drone .tick / self .sim .drone .firmware_freq
200
- < (self ._steps + 1 ) / self .config .env .freq
201
- ):
202
- self .sim .step (thrust )
203
- self .target_gate += self .gate_passed ()
204
- if self .target_gate == self .sim .n_gates :
205
- self .target_gate = - 1
206
- collision |= bool (self .sim .collisions )
207
- pos , rpy , vel = self .sim .drone .pos , self .sim .drone .rpy , self .sim .drone .vel
208
- thrust = self .sim .drone .step_controller (pos , rpy , vel )[::- 1 ]
209
- self .sim .drone .desired_thrust [:] = thrust
210
- self ._last_drone_pos [:] = self .sim .drone .pos
211
- self ._steps += 1
212
- return collision
199
+ def render (self ):
200
+ """Render the environment."""
201
+ self .sim .render ()
213
202
214
203
@property
215
204
def obs (self ) -> dict [str , NDArray [np .floating ]]:
216
205
"""Return the observation of the environment."""
217
206
obs = {
218
- "pos" : self .sim .drone . pos . astype ( np .float32 ),
219
- "rpy" : self .sim .drone . rpy .astype (np .float32 ),
220
- "vel" : self .sim .drone . vel . astype ( np .float32 ),
221
- "ang_vel" : self .sim .drone . ang_vel . astype ( np .float32 ),
207
+ "pos" : np . array ( self .sim .data . states . pos [ 0 , 0 ], dtype = np .float32 ),
208
+ "rpy" : R . from_quat ( self .sim .data . states . quat [ 0 , 0 ]). as_euler ( "xyz" ) .astype (np .float32 ),
209
+ "vel" : np . array ( self .sim .data . states . vel [ 0 , 0 ], dtype = np .float32 ),
210
+ "ang_vel" : np . array ( self .sim .data . states . rpy_rates [ 0 , 0 ], dtype = np .float32 ),
222
211
}
223
212
obs ["ang_vel" ][:] = R .from_euler ("xyz" , obs ["rpy" ]).apply (obs ["ang_vel" ], inverse = True )
224
213
225
- gates = self .sim .gates
226
- obs ["target_gate" ] = self .target_gate if self .target_gate < len (gates ) else - 1
214
+ obs ["target_gate" ] = self .target_gate if self .target_gate < len (self .gates ) else - 1
227
215
# Add the gate and obstacle poses to the info. If gates or obstacles are in sensor range,
228
216
# use the actual pose, otherwise use the nominal pose.
229
- in_range = self .sim .in_range (gates , self .sim .drone , self .config .env .sensor_range )
217
+ drone_pos = self .sim .data .states .pos [0 , 0 ]
218
+ dist = np .linalg .norm (drone_pos [:2 ] - self .gates ["nominal_pos" ][:, :2 ])
219
+ in_range = dist < self .config .env .sensor_range
230
220
self .gates_visited = np .logical_or (self .gates_visited , in_range )
231
221
232
- gates_pos = np .stack ([g ["nominal.pos" ] for g in gates .values ()])
233
- gates_pos [self .gates_visited ] = np .stack ([g ["pos" ] for g in gates .values ()])[
234
- self .gates_visited
235
- ]
236
- gates_rpy = np .stack ([g ["nominal.rpy" ] for g in gates .values ()])
237
- gates_rpy [self .gates_visited ] = np .stack ([g ["rpy" ] for g in gates .values ()])[
238
- self .gates_visited
239
- ]
222
+ gates_pos = self .gates ["nominal_pos" ].copy ()
223
+ gates_pos [self .gates_visited ] = self .gates ["pos" ][self .gates_visited ]
224
+ gates_rpy = self .gates ["nominal_rpy" ].copy ()
225
+ gates_rpy [self .gates_visited ] = self .gates ["rpy" ][self .gates_visited ]
240
226
obs ["gates_pos" ] = gates_pos .astype (np .float32 )
241
227
obs ["gates_rpy" ] = gates_rpy .astype (np .float32 )
242
228
obs ["gates_visited" ] = self .gates_visited
243
229
244
- obstacles = self . sim . obstacles
245
- in_range = self . sim . in_range ( obstacles , self . sim . drone , self .config .env .sensor_range )
230
+ dist = np . linalg . norm ( drone_pos [: 2 ] - self . obstacles [ "nominal_pos" ][:, : 2 ])
231
+ in_range = dist < self .config .env .sensor_range
246
232
self .obstacles_visited = np .logical_or (self .obstacles_visited , in_range )
247
233
248
- obstacles_pos = np .stack ([o ["nominal.pos" ] for o in obstacles .values ()])
249
- obstacles_pos [self .obstacles_visited ] = np .stack ([o ["pos" ] for o in obstacles .values ()])[
250
- self .obstacles_visited
251
- ]
234
+ obstacles_pos = self .obstacles ["nominal_pos" ].copy ()
235
+ obstacles_pos [self .obstacles_visited ] = self .obstacles ["pos" ][self .obstacles_visited ]
252
236
obs ["obstacles_pos" ] = obstacles_pos .astype (np .float32 )
253
237
obs ["obstacles_visited" ] = self .obstacles_visited
254
238
255
- if "observation" in self .sim . disturbances :
256
- obs = self .sim . disturbances ["observation" ].apply (obs )
239
+ if "observation" in self .disturbances :
240
+ obs = self .disturbances ["observation" ].apply (obs )
257
241
return obs
258
242
259
243
@property
@@ -278,11 +262,21 @@ def terminated(self) -> bool:
278
262
True if the drone is out of bounds, colliding with an obstacle, or has passed all gates,
279
263
else False.
280
264
"""
281
- state = {k : getattr (self .sim .drone , k ).copy () for k in self .sim .state_space }
282
- state ["ang_vel" ] = R .from_euler ("xyz" , state ["rpy" ]).apply (state ["ang_vel" ], inverse = True )
283
- if state not in self .sim .state_space :
265
+ quat = self .sim .data .states .quat [0 , 0 ]
266
+ state = {
267
+ "pos" : np .array (self .sim .data .states .pos [0 , 0 ], dtype = np .float32 ),
268
+ "rpy" : np .array (R .from_quat (quat ).as_euler ("xyz" ), dtype = np .float32 ),
269
+ "vel" : np .array (self .sim .data .states .vel [0 , 0 ], dtype = np .float32 ),
270
+ "ang_vel" : np .array (
271
+ R .from_quat (quat ).apply (self .sim .data .states .rpy_rates [0 , 0 ], inverse = True ),
272
+ dtype = np .float32 ,
273
+ ),
274
+ }
275
+ if state not in self .state_space :
284
276
return True # Drone is out of bounds
285
- if self .sim .collisions :
277
+ if np .logical_and (self .sim .contacts ("drone:0" ), self .contact_mask ).any ():
278
+ return True
279
+ if self .sim .data .states .pos [0 , 0 , 2 ] < 0.0 :
286
280
return True
287
281
if self .target_gate == - 1 : # Drone has passed all gates
288
282
return True
@@ -291,7 +285,30 @@ def terminated(self) -> bool:
291
285
@property
292
286
def info (self ) -> dict :
293
287
"""Return an info dictionary containing additional information about the environment."""
294
- return {"collisions" : self .sim .collisions , "symbolic_model" : self .symbolic }
288
+ return {"collisions" : self .sim .contacts ("drone:0" ), "symbolic_model" : self .symbolic }
289
+
290
+ def load_track (self , track : dict ) -> tuple [dict , dict , dict ]:
291
+ """Load the track from the config file."""
292
+ gate_pos = np .array ([g ["pos" ] for g in track .gates ])
293
+ gate_rpy = np .array ([g ["rpy" ] for g in track .gates ])
294
+ gates = {"pos" : gate_pos , "rpy" : gate_rpy , "nominal_pos" : gate_pos , "nominal_rpy" : gate_rpy }
295
+ obstacle_pos = np .array ([o ["pos" ] for o in track .obstacles ])
296
+ obstacles = {"pos" : obstacle_pos , "nominal_pos" : obstacle_pos }
297
+ drone = {
298
+ k : np .array ([track .drone .get (k )], dtype = np .float32 )
299
+ for k in ("pos" , "rpy" , "vel" , "rpy_rates" )
300
+ }
301
+ drone ["quat" ] = R .from_euler ("xyz" , drone ["rpy" ]).as_quat ()
302
+ return gates , obstacles , drone
303
+
304
+ def load_disturbances (self , disturbances : dict | None = None ) -> dict :
305
+ """Load the disturbances from the config."""
306
+ dist = {}
307
+ if disturbances is None : # Default: no passive disturbances.
308
+ return dist
309
+ for mode , spec in disturbances .items ():
310
+ dist [mode ] = NoiseList .from_specs ([spec ])
311
+ return dist
295
312
296
313
def gate_passed (self ) -> bool :
297
314
"""Check if the drone has passed a gate.
@@ -310,50 +327,6 @@ def gate_passed(self) -> bool:
310
327
311
328
def close (self ):
312
329
"""Close the environment by stopping the drone and landing back at the starting position."""
313
- return_home = True # makes the drone simulate the return to home after stopping
314
-
315
- if return_home :
316
- # This is done to run the closing controller at a different frequency than the controller before
317
- # Does not influence other code, since this part is already in closing!
318
- # WARNING: When changing the frequency, you must also change the current _step!!!
319
- freq_new = 100 # Hz
320
- self ._steps = int (self ._steps / self .config .env .freq * freq_new )
321
- self .config .env .freq = freq_new
322
- t_step_ctrl = 1 / self .config .env .freq
323
-
324
- obs = self .obs
325
- obs ["acc" ] = np .array (
326
- [0 , 0 , 0 ]
327
- ) # TODO, use actual value when avaiable or do one step to calculate from velocity
328
- info = self .info
329
- info ["env_freq" ] = self .config .env .freq
330
- info ["drone_start_pos" ] = self .config .env .track .drone .pos
331
-
332
- controller = ClosingController (obs , info )
333
- t_total = controller .t_total
334
-
335
- for i in np .arange (int (t_total / t_step_ctrl )): # hover for some more time
336
- action = controller .compute_control (obs )
337
- action = action .astype (np .float64 ) # Drone firmware expects float64
338
- if self .config .sim .physics == PhysicsMode .SYS_ID :
339
- print ("[Warning] sys_id model not supported by return home script" )
340
- break
341
- pos , vel , acc , yaw , rpy_rate = (
342
- action [:3 ],
343
- action [3 :6 ],
344
- action [6 :9 ],
345
- action [9 ],
346
- action [10 :],
347
- )
348
- self .sim .drone .full_state_cmd (pos , vel , acc , yaw , rpy_rate )
349
- collision = self ._inner_step_loop ()
350
- terminated = self .terminated or collision
351
- obs = self .obs
352
- obs ["acc" ] = np .array ([0 , 0 , 0 ])
353
- controller .step_callback (action , obs , self .reward , terminated , False , info )
354
- if self .config .sim .gui : # Only sync if gui is active
355
- time .sleep (t_step_ctrl )
356
-
357
330
self .sim .close ()
358
331
359
332
0 commit comments