diff --git a/odetoolbox/__init__.py b/odetoolbox/__init__.py index 7fda23ee..cc6875cd 100644 --- a/odetoolbox/__init__.py +++ b/odetoolbox/__init__.py @@ -23,6 +23,7 @@ import json import logging import sys +import numpy as np import sympy from sympy.core.expr import Expr as SympyExpr @@ -231,64 +232,74 @@ def _analysis(indict, disable_stiffness_check: bool = False, disable_analytic_so sys.exit(1) shape_sys = SystemOfShapes.from_shapes(shapes, parameters=parameters) - _, node_is_analytically_solvable = _find_analytically_solvable_equations(shape_sys, shapes, parameters=parameters) + if Config().use_exponential_euler_solver: + solvers_json = [] - # - # generate analytical solutions (propagators) where possible - # + # compute exponential Euler solver + solver_json = shape_sys.generate_exponential_euler_solver() + solvers_json.append(solver_json) + logging.info("Generating propagators for the following symbols: " + ", ".join([str(k) for k in shape_sys.x_])) - solvers_json = [] - if disable_analytic_solver: - analytic_syms = [] else: - analytic_syms = [node_sym for node_sym, _node_is_analytically_solvable in node_is_analytically_solvable.items() if _node_is_analytically_solvable] + _, node_is_analytically_solvable = _find_analytically_solvable_equations(shape_sys, shapes, parameters=parameters) - analytic_solver_json = None - if analytic_syms: - logging.info("Generating propagators for the following symbols: " + ", ".join([str(k) for k in analytic_syms])) - sub_sys = shape_sys.get_sub_system(analytic_syms) - analytic_solver_json = sub_sys.generate_propagator_solver() - analytic_solver_json["solver"] = "analytical" - solvers_json.append(analytic_solver_json) + # + # generate analytical solutions (propagators) where possible + # - # - # generate numerical solvers for the remainder - # - - if len(analytic_syms) < len(shape_sys.x_): - numeric_syms = list(set(shape_sys.x_) - set(analytic_syms)) - logging.info("Generating numerical solver for the following symbols: " + ", ".join([str(sym) for sym in numeric_syms])) - sub_sys = shape_sys.get_sub_system(numeric_syms) - solver_json = sub_sys.generate_numeric_solver(state_variables=shape_sys.x_) - solver_json["solver"] = "numeric" # will be appended to if stiffness testing is used - if not disable_stiffness_check: - if not PYGSL_AVAILABLE: - raise Exception("Stiffness test requested, but PyGSL not available") - - logging.info("Performing stiffness test...") - kwargs = {} # type: Dict[str, Any] - if "options" in indict.keys() and "random_seed" in indict["options"].keys(): - random_seed = int(indict["options"]["random_seed"]) - assert random_seed >= 0, "Random seed needs to be a non-negative integer" - kwargs["random_seed"] = random_seed - if "parameters" in indict.keys(): - kwargs["parameters"] = indict["parameters"] - if "stimuli" in indict.keys(): - kwargs["stimuli"] = indict["stimuli"] - for key in ["sim_time", "max_step_size", "integration_accuracy_abs", "integration_accuracy_rel"]: - if "options" in indict.keys() and key in Config().keys(): - kwargs[key] = float(Config()[key]) - if not analytic_solver_json is None: - kwargs["analytic_solver_dict"] = analytic_solver_json - tester = StiffnessTester(sub_sys, shapes, **kwargs) - solver_type = tester.check_stiffness() - if not solver_type is None: - solver_json["solver"] += "-" + solver_type - logging.info(solver_type + " scheme") - - solvers_json.append(solver_json) + solvers_json = [] + if disable_analytic_solver: + analytic_syms = [] + else: + analytic_syms = [node_sym for node_sym, _node_is_analytically_solvable in node_is_analytically_solvable.items() if _node_is_analytically_solvable] + + analytic_solver_json = None + if analytic_syms: + logging.info("Generating propagators for the following symbols: " + ", ".join([str(k) for k in analytic_syms])) + sub_sys = shape_sys.get_sub_system(analytic_syms) + analytic_solver_json = sub_sys.generate_propagator_solver() + analytic_solver_json["solver"] = "analytical" + solvers_json.append(analytic_solver_json) + + + # + # generate numerical solvers for the remainder + # + + if len(analytic_syms) < len(shape_sys.x_): + numeric_syms = list(set(shape_sys.x_) - set(analytic_syms)) + logging.info("Generating numerical solver for the following symbols: " + ", ".join([str(sym) for sym in numeric_syms])) + sub_sys = shape_sys.get_sub_system(numeric_syms) + solver_json = sub_sys.generate_numeric_solver(state_variables=shape_sys.x_) + solver_json["solver"] = "numeric" # will be appended to if stiffness testing is used + if not disable_stiffness_check: + if not PYGSL_AVAILABLE: + raise Exception("Stiffness test requested, but PyGSL not available") + + logging.info("Performing stiffness test...") + kwargs = {} # type: Dict[str, Any] + if "options" in indict.keys() and "random_seed" in indict["options"].keys(): + random_seed = int(indict["options"]["random_seed"]) + assert random_seed >= 0, "Random seed needs to be a non-negative integer" + kwargs["random_seed"] = random_seed + if "parameters" in indict.keys(): + kwargs["parameters"] = indict["parameters"] + if "stimuli" in indict.keys(): + kwargs["stimuli"] = indict["stimuli"] + for key in ["sim_time", "max_step_size", "integration_accuracy_abs", "integration_accuracy_rel"]: + if "options" in indict.keys() and key in Config().keys(): + kwargs[key] = float(Config()[key]) + if not analytic_solver_json is None: + kwargs["analytic_solver_dict"] = analytic_solver_json + tester = StiffnessTester(sub_sys, shapes, **kwargs) + solver_type = tester.check_stiffness() + if not solver_type is None: + solver_json["solver"] += "-" + solver_type + logging.info(solver_type + " scheme") + + solvers_json.append(solver_json) # diff --git a/odetoolbox/config.py b/odetoolbox/config.py index 6f4fe97e..1242d970 100644 --- a/odetoolbox/config.py +++ b/odetoolbox/config.py @@ -35,7 +35,8 @@ class Config: "max_step_size": 999., "integration_accuracy_abs": 1E-6, "integration_accuracy_rel": 1E-6, - "forbidden_names": ["oo", "zoo", "nan", "NaN", "__h"] + "forbidden_names": ["oo", "zoo", "nan", "NaN", "__h"], + "use_exponential_euler_solver": True } def __getitem__(self, key): diff --git a/odetoolbox/system_of_shapes.py b/odetoolbox/system_of_shapes.py index ccd66e3a..eb95c584 100644 --- a/odetoolbox/system_of_shapes.py +++ b/odetoolbox/system_of_shapes.py @@ -88,7 +88,8 @@ def __init__(self, x: sympy.Matrix, A: sympy.Matrix, b: sympy.Matrix, c: sympy.M :param b: Vector containing inhomogeneous part (constant term). :param c: Vector containing nonlinear part. """ - logging.debug("Initializing system of shapes with x = " + str(x) + ", A = " + str(A) + ", b = " + str(b) + ", c = " + str(c)) + + #logging.debug("Initializing system of shapes with x = " + str(x) + ", A = " + str(A) + ", b = " + str(b) + ", c = " + str(c)) assert x.shape[0] == A.shape[0] == A.shape[1] == b.shape[0] == c.shape[0] self.x_ = x self.A_ = A @@ -96,6 +97,11 @@ def __init__(self, x: sympy.Matrix, A: sympy.Matrix, b: sympy.Matrix, c: sympy.M self.c_ = c self.shapes_ = shapes + logging.debug("System of equations:") + logging.debug("x = " + str(self.x_)) + logging.debug("A = " + repr(self.A_)) + logging.debug("b = " + str(self.b_)) + logging.debug("c = " + str(self.c_)) def get_shape_by_symbol(self, sym: str) -> Optional[Shape]: for shape in self.shapes_: @@ -259,11 +265,11 @@ def generate_propagator_solver(self): update_expr = {} # keys are str(variable symbol), values are str(expressions) that evaluate to the new value of the corresponding key for row in range(P_sym.shape[0]): # assemble update expression for symbol ``self.x_[row]`` - if not _is_zero(self.c_[row]): - raise PropagatorGenerationException("For symbol " + str(self.x_[row]) + ": nonlinear part should be zero for propagators") + # if not _is_zero(self.c_[row]): + # raise PropagatorGenerationException("For symbol " + str(self.x_[row]) + ": nonlinear part should be zero for propagators") - if not _is_zero(self.b_[row]) and self.shape_order_from_system_matrix(row) > 1: - raise PropagatorGenerationException("For symbol " + str(self.x_[row]) + ": higher-order inhomogeneous ODEs are not supported") + # if not _is_zero(self.b_[row]) and self.shape_order_from_system_matrix(row) > 1: + # raise PropagatorGenerationException("For symbol " + str(self.x_[row]) + ": higher-order inhomogeneous ODEs are not supported") update_expr_terms = [] for col in range(P_sym.shape[1]): @@ -305,6 +311,7 @@ def generate_propagator_solver(self): return solver_dict + def generate_numeric_solver(self, state_variables=None): r""" Generate the symbolic expressions for numeric integration state updates; return as JSON. @@ -319,6 +326,102 @@ def generate_numeric_solver(self, state_variables=None): return solver_dict + def generate_update_expr_from_propagator_matrix(self, P, prop_prefix: str): + r"""generate symbols for each nonzero entry of the nonlin_P matrix""" + + P_sym = sympy.zeros(*P.shape) # each entry in the propagator matrix is assigned its own symbol + P_expr = {} # the expression corresponding to each propagator symbol + update_expr = {} # keys are str(variable symbol), values are str(expressions) that evaluate to the new value of the corresponding key + for row in range(P_sym.shape[0]): + # assemble update expression for symbol ``self.x_[row]`` + update_expr_terms = [] + for col in range(P_sym.shape[1]): + if not _is_zero(P[row, col]): + sym_str = "__NP__{}__{}".format(str(self.x_[row]), str(self.x_[col])) + P_sym[row, col] = sympy.parsing.sympy_parser.parse_expr(sym_str, global_dict=Shape._sympy_globals) + P_expr[sym_str] = P[row, col] + update_expr_terms.append(sym_str + " * " + str(self.x_[col])) + + # if not _is_zero(self.b_[row]): + # # this is an inhomogeneous ODE + # if _is_zero(self.A_[row, row]): + # # of the form x' = const + # update_expr_terms.append(Config().output_timestep_symbol + " * " + str(self.b_[row])) + # else: + # particular_solution = -self.b_[row] / self.A_[row, row] + # sym_str = "__P__{}__{}".format(str(self.x_[row]), str(self.x_[row])) + # update_expr_terms.append("-" + sym_str + " * " + str(self.x_[row])) # remove the term (add its inverse) that would have corresponded to a homogeneous solution and that was added in the ``for col...`` loop above + # update_expr_terms.append(sym_str + " * (" + str(self.x_[row]) + " - (" + str(particular_solution) + "))" + " + (" + str(particular_solution) + ")") + + update_expr[str(self.x_[row])] = " + ".join(update_expr_terms) + update_expr[str(self.x_[row])] = sympy.parsing.sympy_parser.parse_expr(update_expr[str(self.x_[row])], global_dict=Shape._sympy_globals) + if not _is_zero(self.b_[row]): + # only simplify in case an inhomogeneous term is present + update_expr[str(self.x_[row])] = _custom_simplify_expr(update_expr[str(self.x_[row])]) + logging.info("update_expr[" + str(self.x_[row]) + "] = " + str(update_expr[str(self.x_[row])])) + + return update_expr + + + + + + def generate_nonlin_propagators(self, P): + # + # generate symbols for each nonzero entry of the propagator matrix + # + + P_sym = sympy.zeros(*P.shape) # each entry in the propagator matrix is assigned its own symbol + P_expr = {} # the expression corresponding to each propagator symbol + for row in range(P_sym.shape[0]): + for col in range(P_sym.shape[1]): + if not _is_zero(P[row, col]): + sym_str = "__NP__{}__{}".format(str(self.x_[row]), str(self.x_[col])) + P_sym[row, col] = sympy.parsing.sympy_parser.parse_expr(sym_str, global_dict=Shape._sympy_globals) + P_expr[sym_str] = P[row, col] + + return P_expr + + def generate_exponential_euler_solver(self): + r""" + """ + + # for the linear part, compute propagators + linear_propagator_matrix = self._generate_propagator_matrix(self.A_) + linear_solver_json = self.generate_propagator_solver() + + # for the nonlinear part + nonlin_P = self.A_.inv() - self.A_.inv() * linear_propagator_matrix + nonlin_update_expr = self.generate_update_expr_from_propagator_matrix(nonlin_P, prop_prefix="__NP") + nonlin_propagators = self.generate_nonlin_propagators(nonlin_P) + + all_state_symbols = [str(sym) for sym in self.x_] + initial_values = {sym: str(self.get_initial_value(sym)) for sym in all_state_symbols} + + combined_update_expr = linear_solver_json["update_expressions"] + for sym in nonlin_update_expr.keys(): + if sym in combined_update_expr.keys(): + combined_update_expr[sym] += nonlin_update_expr[sym] + else: + combined_update_expr[sym] = nonlin_update_expr[sym] + + print("linear_solver_json = " + str(linear_solver_json)) + print("nonlin_update_expr = " + str(nonlin_update_expr)) + + solver_dict = {"solver": "analytic", # actually should be "exponential_euler"; but "analytic" fools NESTML into accepting the update equations and propagators with the existing software infrastructure (no need for a special case) + "update_expressions": combined_update_expr, + "state_variables": all_state_symbols, + "propagators": linear_solver_json["propagators"] | nonlin_propagators, + "initial_values": initial_values } + print(solver_dict) + # logging.info(json.dumps(solver_dict, indent=4, sort_keys=True)) + return solver_dict + + + + + + def reconstitute_expr(self, state_variables=None): r""" diff --git a/tests/test_exponential_euler.py b/tests/test_exponential_euler.py new file mode 100644 index 00000000..cbed14e0 --- /dev/null +++ b/tests/test_exponential_euler.py @@ -0,0 +1,205 @@ +# +# test_exponential_euler.py +# +# This file is part of the NEST ODE toolbox. +# +# Copyright (C) 2017 The NEST Initiative +# +# The NEST ODE toolbox is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation, either version 2 of +# the License, or (at your option) any later version. +# +# The NEST ODE toolbox is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with NEST. If not, see . +# + +from matplotlib import pyplot as plt +import numpy as np +import pytest +import scipy + +from tests.test_utils import _open_json + +try: + import pygsl + PYGSL_AVAILABLE = True +except ImportError: + PYGSL_AVAILABLE = False + +import odetoolbox +from odetoolbox.analytic_integrator import AnalyticIntegrator + + +class TestExponentialEuler: + + def generate_reference_lotka_volterra_timeseries(self): + def lotka_volterra_ode(t, y, alpha, beta, delta, gmma): + """ + Defines the Lotka-Volterra differential equations. + + Args: + t: Time (not explicitly used in this autonomous system, but required by solve_ivp). + y: Array of current population values [prey, predator]. + alpha: Prey natural growth rate. + beta: Predation rate coefficient. + delta: Predator growth efficiency coefficient. + gmma: Predator natural death rate. + + Returns: + Array of the derivatives [dx/dt, dy/dt]. + """ + prey, predator = y + dprey_dt = alpha * prey - beta * prey * predator + dpredator_dt = delta * prey * predator - gmma * predator + return [dprey_dt, dpredator_dt] + + # 2. Set Parameters (Chosen to potentially exhibit stiffness) + # A large gmma might make the predator population crash quickly, + # introducing a faster time scale compared to prey recovery. + alpha = 1.0 # Prey growth rate + beta = 1.0 # Predation rate + delta = 1.0 # Predator efficiency + gmma = 3.0 # Predator death rate (relatively high) + + # 3. Set Initial Conditions and Time Span + y0 = [10.0, 5.0] # Initial populations [prey, predator] + t_span = (0, 30) # Time interval for integration (start, end) + t_eval = np.linspace(t_span[0], t_span[1], 500) # Points where solution is stored + + # 4. Integrate the ODEs using a stiff solver + # 'Radau' and 'BDF' are implicit methods suitable for stiff problems. + # 'LSODA' can automatically detect stiffness and switch methods. + # We'll use 'Radau' explicitly here. + sol = scipy.integrate.solve_ivp( + lotka_volterra_ode, + t_span, + y0, + method='Radau', # Solver choice for stiff systems + args=(alpha, beta, delta, gmma), + t_eval=t_eval, # Specify output times + dense_output=True # Useful for smooth plotting if needed later + ) + + # 5. Check if the integration was successful + if not sol.success: + raise Exception(f"ODE integration failed: {sol.message}") + + # 6. Extract the results + t = sol.t + prey_pop = sol.y[0] + predator_pop = sol.y[1] + + return t, prey_pop, predator_pop + + + def test_exponential_euler(self): + alpha = 1.0 + beta = 1.0 + delta = 1.0 + gmma = 3.0 + + # Initial conditions from the previous example: + x0 = 10.0 + y0 = 5.0 + + indict = {"dynamics": [ + { + "expression": "x' = alpha * x - beta * x * y", # Prey equation + "initial_value": str(x0) # Initial prey population as string + }, + { + "expression": "y' = delta * x * y - gmma * y", # Predator equation + "initial_value": str(y0) # Initial predator population as string + } + # Note: The third expression "V_m' = ..." from your example doesn't + # belong to the standard Lotka-Volterra model, so it's omitted here. + ], + "parameters": { + "alpha": str(alpha), # Prey growth rate as string + "beta": str(beta), # Predation rate as string + "delta": str(delta), # Predator efficiency as string + "gmma": str(gmma) # Predator death rate as string + # Note: Parameters "tau" and "E_L" from your example don't belong + # to the standard Lotka-Volterra model, so they are omitted here. + } +} + T = 30 + h = 0.1 + x0 = 10.0 + y0 = 5.0 + + result = odetoolbox.analysis(indict) + assert len(result) == 1 + solver_dict = result[0] + + N = int(np.ceil(T / h) + 1) + timevec = np.linspace(0., T, N) + state = {sym: [] for sym in solver_dict["state_variables"]} + state["timevec"] = [] + analytic_integrator = AnalyticIntegrator(solver_dict) + analytic_integrator.set_initial_values({"x": x0, "y": y0}) + analytic_integrator.reset() + for step, t in enumerate(timevec): + state_ = analytic_integrator.get_value(t) + state["timevec"].append(t) + for sym, val in state_.items(): + state[sym].append(val) + + for k, v in state.items(): + state[k] = np.array(v) + + t, prey_pop, predator_pop = self.generate_reference_lotka_volterra_timeseries() + + + + fig, ax = plt.subplots(figsize=(10, 6)) # Create figure and axes objects + + ax.plot(t, prey_pop, label='Prey Population (x)', color='blue') + ax.plot(t, predator_pop, label='Predator Population (y)', color='red') + + ax.plot(state["timevec"], state["x"], label='Prey Population (x)', color='blue', linestyle="--") + ax.plot(state["timevec"], state["y"], label='Predator Population (y)', color='red', linestyle="--") + + # Add labels and title + ax.set_xlabel('Time') + ax.set_ylabel('Population') + ax.set_title(f'Lotka-Volterra Predator-Prey Model (Stiff Solver: Radau)\n' + f'α={alpha}, β={beta}, δ={delta}, γ={gmma}') + + # Add legend and grid + ax.legend() + ax.grid(True) + + # Set limits (optional, adjust as needed) + ax.set_ylim(bottom=0) # Populations cannot be negative + + # Display the plot + plt.show() + + # --- Optional: Plot the phase portrait (Predator vs Prey) --- + fig_phase, ax_phase = plt.subplots(figsize=(7, 7)) + + ax_phase.plot(prey_pop, predator_pop, color='purple') + # Mark the start and end points + ax_phase.plot(prey_pop[0], predator_pop[0], 'go', label='Start') # Green circle + ax_phase.plot(prey_pop[-1], predator_pop[-1], 'mo', label='End') # Magenta circle + + + ax_phase.set_xlabel('Prey Population (x)') + ax_phase.set_ylabel('Predator Population (y)') + ax_phase.set_title('Phase Portrait (Predator vs. Prey)') + ax_phase.grid(True) + ax_phase.legend() + ax_phase.set_xlim(left=0) + ax_phase.set_ylim(bottom=0) + + plt.show() + + + # assert ...... \ No newline at end of file