diff --git a/README.md b/README.md index dae20e8..3df93c3 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ For manual installation follow these instructions: 4. Enable execution privilege on the file "t_renderer" with: ```bash -chmod +x $ACADOS_ROOT_DIR/bin/t_renderer +chmod +x $ACADOS_SOURCE_DIR/bin/t_renderer ``` #### 5. Install acados_template @@ -103,8 +103,8 @@ You can build the C++ interface and run the MPC using the C++ interface, from th ```bash mkdir -p build cd build -cmake .. -make -j4 -DBUILD_EXAMPLES=ON +cmake .. -DBUILD_EXAMPLES=ON +make -j4 ``` You can run the example: diff --git a/examples/position_config.yaml b/examples/position_config.yaml new file mode 100644 index 0000000..e8f740c --- /dev/null +++ b/examples/position_config.yaml @@ -0,0 +1,25 @@ +sim_config: + max_speed: 5.0 # m/s + waypoints: + - [14.0, 25.0, 1.23] + - [30.0, 19.0, 1.23] + - [46.0, 22.0, 1.23] + - [63.0, 20.0, 3.93] + - [85.0, 18.0, 3.93] + - [93.0, 16.0, 2.93] + - [88.0, 19.0, 1.5] + - [85.0, 18.0, 1.23] + - [68.0, 13.0, 1.23] + - [55.0, 07.0, 1.23] + - [37.0, 12.0, 1.23] + - [19.7, 07.0, 1.23] + - [09.0, 14.0, 1.23] + path_facing: false +controller: + mpc: + Q: [5.0, 5.0, 10.0, 0.1, 0.1, 20.0, 0.1, 0.1, 0.1] # Weight for internal states: [x, y, z, roll, pitch, yaw, vx, vy, vz] + Qe: [10.0, 10.0, 20.0, 0.1, 0.1, 20.0, 0.1, 0.1, 0.1] # Weight for end states: [x, y, z, roll, pitch, yaw, vx, vy, vz] + R: [0.8, 1.0, 1.0, 0.5] # Weight for control inputs: [thrust, wx, wy, wz] + lbu: [0.01, -5.0, -5.0, -5.0] # Lower bounds for control inputs: [thrust, wx, wy, wz] + ubu: [40.0, 5.0, 5.0, 5.0] # Upper bounds for control inputs: [thrust, wx, wy, wz] + p: [1.0, 1.0, 0.0, 0.0, 0.0] # Online parameter: [mass, qw_ref, qx_ref, qy_ref, qz_ref] \ No newline at end of file diff --git a/examples/position_example.py b/examples/position_example.py new file mode 100644 index 0000000..0b7ae20 --- /dev/null +++ b/examples/position_example.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Universidad Politécnica de Madrid +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the Universidad Politécnica de Madrid nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +"""MPC Test using Acados Integrator.""" + +__authors__ = 'Rafael Pérez Seguí' +__copyright__ = 'Copyright (c) 2022 Universidad Politécnica de Madrid' +__license__ = 'BSD-3-Clause' + +from functools import wraps + +from mpc.mpc_controller import MPC, mpc_lib +from mpc.read_config import read_mpc_params +import numpy as np +import time +from tqdm import tqdm +from utils.plot_results import plotSim3D +from utils.position_utils import euler_to_quaternion, read_yaml_params, CsvLogger, SimParams + +SIM_TIME = 30.0 + + +def position_reference_to_mpc_reference(position_reference, ref_yaw: float = 0.0): + """Convert trajectory point to MPC reference.""" + return mpc_lib.CaState.get_state( + position=position_reference, + orientation=euler_to_quaternion(0.0, 0.0, ref_yaw), + ) + + +def progress_bar(func): + @wraps(func) + def wrapper(mpc, simulator, trajectory_generator, logger, *args, **kwargs): + sim_max_t = SIM_TIME + + pbar = tqdm(total=sim_max_t, desc=f'Progress {func.__name__}', unit='iter', + bar_format='{l_bar}{bar} | {n:.4f}/{total:.2f} ' + '[{elapsed}<{remaining}, {rate_fmt}]') + + result = func(mpc, simulator, trajectory_generator, logger, pbar, *args, **kwargs) + + pbar.close() + return result + return wrapper + + +@progress_bar +def test_trajectory_controller( + mpc: MPC, + integrator: mpc_lib.AcadosSimSolver, + simulation_data: SimParams, + logger: CsvLogger, + pbar): + """Test trajectory controller.""" + x = mpc_lib.CaState.get_state() + u = mpc_lib.CaControl.get_control() + y = mpc_lib.CaState.get_state() + + prediction_steps = mpc.prediction_steps + prediction_horizon = mpc.prediction_horizon + tf = prediction_horizon / prediction_steps + + t = 0.0 + max_time = SIM_TIME + + mpc_solve_times = np.zeros(0) + state_history = np.zeros(7) + reference_history = np.zeros(7) + + position_references = simulation_data.waypoints + pos_index = 0 + pos_ref = position_reference_to_mpc_reference(position_references[0]) + + logger.save(t, x, y, u) + while t < max_time: + reference = np.zeros((prediction_steps + 1, mpc.x_dim)) + # print(reference_trajectory.shape) # (101, 10) + for i in range(prediction_steps + 1): + reference[i, :] = position_reference_to_mpc_reference( + position_references[pos_index]) + + current_time = time.time() + u = mpc.evaluate(x, reference[:-1], reference[-1][:mpc.x_dim]) + mpc_solve_times = np.append(mpc_solve_times, time.time() - current_time) + + integrator.set('x', x) + integrator.set('u', u) + status = integrator.solve() + if status != 0: + raise Exception( + 'acados integrator returned status {}. Exiting.'.format(status)) + x = integrator.get('x') + + t += tf + + # Update history + state_history = np.vstack((state_history, x[:7])) + reference_history = np.vstack((reference_history, reference[0][:7])) + + # Log data + y = mpc_lib.CaState.get_state( + position=reference[0][0:3], + orientation=reference[0][3:7], + linear_velocity=reference[0][7:10]) + logger.save(t, x, y, u) + + # Compute error between current state x and reference state reference[0][0:3] + error = np.linalg.norm(x[:3] - reference[0][:3]) + if error < 0.1 and pos_index < len(position_references) - 1: + pos_index += 1 + pos_ref = position_reference_to_mpc_reference(position_references[pos_index]) + print(f'Position reference updated to {pos_ref[:3]} at time {t:.2f}s') + + pbar.update(tf) + print(f'MPC solve time mean: {np.mean(mpc_solve_times)}') + plotSim3D(state_history, reference_history) + + +if __name__ == '__main__': + # Params + simulation_yaml = read_yaml_params('examples/position_config.yaml') + + # MPC Params + yaml_data = read_mpc_params('mpc_config.yaml') + + # Logger + file_name = 'mpc_log.csv' + logger = CsvLogger(file_name) + + mpc = MPC( + prediction_steps=yaml_data.N_horizon, + prediction_horizon=yaml_data.tf, + params=yaml_data.mpc_params + ) + + Tf = mpc.prediction_horizon + N = mpc.prediction_steps + dt = Tf / N + integrator = mpc.export_integrador(dt) + + test_trajectory_controller( + mpc, + integrator, + simulation_yaml.sim_params, + logger) diff --git a/examples/utils/position_utils.py b/examples/utils/position_utils.py new file mode 100644 index 0000000..94315c4 --- /dev/null +++ b/examples/utils/position_utils.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Universidad Politécnica de Madrid +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the Universidad Politécnica de Madrid nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +"""Example utils.""" + +__authors__ = 'Rafael Pérez Seguí' +__copyright__ = 'Copyright (c) 2022 Universidad Politécnica de Madrid' +__license__ = 'BSD-3-Clause' + +from dataclasses import dataclass, field + +import math +import yaml +import os +import numpy as np + + +def euler_to_quaternion(roll: float, pitch: float, yaw: float) -> np.ndarray: + """ + Convert Euler angles to a quaternion. + + :param roll: Roll angle in radians. + :param pitch: Pitch angle in radians. + :param yaw: Yaw angle in radians. + :return: np.ndarray object. + """ + # Calculate half angles + roll_half = roll * 0.5 + pitch_half = pitch * 0.5 + yaw_half = yaw * 0.5 + + # Calculate sine and cosine of the half angles + sr = math.sin(roll_half) + cr = math.cos(roll_half) + sp = math.sin(pitch_half) + cp = math.cos(pitch_half) + sy = math.sin(yaw_half) + cy = math.cos(yaw_half) + + # Calculate quaternion components + w = cr * cp * cy + sr * sp * sy + x = sr * cp * cy - cr * sp * sy + y = cr * sp * cy + sr * cp * sy + z = cr * cp * sy - sr * sp * cy + + # Return the quaternion + q = np.array([w, x, y, z]) + return q / np.linalg.norm(q) + + +def compute_path_facing(velocity: np.ndarray) -> list: + """ + Compute the quaternion facing based on velocity. + + :param velocity: 3D velocity vector. + :return: Quaternion as a list [w, x, y, z]. + """ + yaw = math.atan2(velocity[1], velocity[0]) + pitch, roll = 0.0, 0.0 + + q = euler_to_quaternion(roll, pitch, yaw) + return np.array([q[0], q[1], q[2], q[3]]) + + +@dataclass +class YamlMPCData: + Q: np.ndarray = np.zeros((10, 10)) + Qe: np.ndarray = np.zeros((10, 10)) + R: np.ndarray = np.zeros((4, 4)) + lbu: np.ndarray = np.zeros(4) + ubu: np.ndarray = np.zeros(4) + p: np.ndarray = np.zeros(5) + + +@dataclass +class SimParams: + """ + Sim parameters. + + :param max_speed(float): Maximum speed of the UAV. + :param waypoints(np.ndarray): Waypoints of the path. + :param floor_height(float): Floor height. + :param path_facing(bool): Path facing. + """ + + max_speed: float = 1.0 + waypoints: list = field(default_factory=lambda: []) + path_facing: bool = False + + +@dataclass +class YamlData: + """ + Yaml data. + + :param sim_params(SimParams): Simulation parameters. + :param mpc_data(YamlMPCData): MPC data. + """ + + sim_params: SimParams = field(default_factory=lambda: SimParams()) + mpc_data: YamlMPCData = field(default_factory=lambda: YamlMPCData()) + + +def read_yaml_params(file_path: str): + """ + Read YAML configuration file and populate YamlData object. + + :param file_path: Path to the YAML file. + """ + if not os.path.exists(file_path): + absolute_simulation_config_path = os.path.abspath(file_path) + print(f"File {absolute_simulation_config_path} does not exist.") + raise ValueError("File does not exist") + + with open(file_path, 'r') as f: + config = yaml.safe_load(f) + + data = YamlData() + + # Read simulation parameters + data.sim_params.max_speed = config["sim_config"]["max_speed"] + + for waypoint in config["sim_config"]["waypoints"]: + data.sim_params.waypoints.append( + np.array([waypoint[0], waypoint[1], waypoint[2]])) + + data.sim_params.path_facing = config["sim_config"]["path_facing"] + + data.mpc_data.Q = np.diag(np.array(config["controller"]["mpc"]["Q"], dtype=np.float64)) + data.mpc_data.Qe = np.diag(np.array(config["controller"]["mpc"]["Qe"], dtype=np.float64)) + data.mpc_data.R = np.diag(np.array(config["controller"]["mpc"]["R"], dtype=np.float64)) + data.mpc_data.lbu = np.array(config["controller"]["mpc"]["lbu"], dtype=np.float64) + data.mpc_data.ubu = np.array(config["controller"]["mpc"]["ubu"], dtype=np.float64) + data.mpc_data.p = np.array(config["controller"]["mpc"]["p"], dtype=np.float64) + + return data + + +class CsvLogger: + """Log simulation data to a csv file.""" + + def __init__(self, file_name: str) -> None: + """ + Log simulation data to a csv file. + + :param file_name(str): Name of the file to save the data. + """ + self.file_name = file_name + print(f'Saving to file: {self.file_name}') + self.file = open(self.file_name, 'w') + self.file.write( + 'time,' + 'x,y,z,qw,qx,qy,qz,vx,vy,vz,' + 'x_ref,y_ref,z_ref,qw_ref,qx_ref,qy_ref,qz_ref,vx_ref,vy_ref,vz_ref' + 'thrust_ref,wx_ref,wy_ref,wz_ref\n') + + def add_double(self, data: float) -> None: + """ + Add a double data to the csv file. + + :param data(float): Double data to add. + """ + if data is None: + raise ValueError('Data is None') + self.file.write(f'{data},') + + def add_string(self, data: str, add_final_comma: bool = True) -> None: + """ + Add a string data to the csv file. + + :param data(str): String data to add. + :param add_final_comma(bool): Add a final comma to the string. + """ + self.file.write(f'{data}') + if add_final_comma: + self.file.write(',') + + def add_vector_row(self, data: np.array, add_final_comma: bool = True) -> None: + """ + Add a vector data to the csv file. + + :param data(np.array): Vector data to add. + :param add_final_comma(bool): Add a final comma to the string. + """ + for i in range(data.size): + if data[i] is None: + raise ValueError('Data is None') + self.file.write(f'{data[i]}') + if i < data.size - 1: + self.file.write(',') + elif add_final_comma: + self.file.write(',') + + def save(self, time: float, x: np.ndarray, y: np.ndarray, u: np.ndarray) -> None: + """ + Save the simulation data to the csv file. + + :param time(float): Current simulation time. + :param simulator(ms.Simulator): Simulator object. + """ + self.add_double(time) + + # State + self.add_vector_row(x) + + # Reference position + self.add_vector_row(y) + + # Control + self.add_vector_row(u, False) + + # End line + self.file.write('\n') + + def close(self) -> None: + """Close the csv file.""" + self.file.close() + + +if __name__ == '__main__': + # Params + simulation_yaml = read_yaml_params('examples/position_config.yaml') diff --git a/mpc/mpc_controller.py b/mpc/mpc_controller.py index 581ee0a..8af6f53 100644 --- a/mpc/mpc_controller.py +++ b/mpc/mpc_controller.py @@ -111,6 +111,22 @@ def trajectory_to_acro( u = self.compute_control_action(state, trajectory[:-1, :], trajectory[-1, :]) return u + def position_to_acro( + self, + position: np.ndarray, + max_velocity: float = 0.0) -> np.ndarray: + """ + Convert a desired position into a desired attitude and thrust. + + :param position(np.array): Desired position [x, y, z] (m). + :param max_velocity(float): Maximum velocity (m/s). If 0.0, + the velocity is not considered. + + :return: Desired acro [thrust, wx, wy, wz] (N, rad/s). + """ + # Set velocity constraints + # if max_velocity > 0.0: + # Getters and Setters def update_params(self, mpc_params: mpc_lib.AcadosMPCParams) -> None: """ diff --git a/mpc/mpc_controller_lib/acados_solver.py b/mpc/mpc_controller_lib/acados_solver.py index b6bbc28..9677758 100644 --- a/mpc/mpc_controller_lib/acados_solver.py +++ b/mpc/mpc_controller_lib/acados_solver.py @@ -356,6 +356,27 @@ def _set_references(self, yref: np.ndarray, yref_e: np.ndarray) -> None: ]) self.solver.set(self.N, 'p', p) + def _set_state_constraints( + self, + jbx: list, + lbx: np.ndarray, + ubx: np.ndarray) -> None: + """ + Set the state constraints for the MPC solver. + + :param jbx: List of indices for the state constraints. + :param lbx: Lower bounds for the state constraints. + :param ubx: Upper bounds for the state constraints. + """ + if len(jbx) != self.x_dim: + raise ValueError(f'jbx must have {self.x_dim} elements, got {len(jbx)}') + + for node in range(self.N + 1): + for i, j in enumerate(jbx): + # Set lower bounds + self.solver.constraints_set(node, 'lbx', lbx) + # Set upper bounds + self.solver.constraints_set(node, 'ubx', ubx) def evaluate( self,