Skip to content

Implementing multistart version of theta_est using multiple sampling methods #3575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions pyomo/contrib/parmest/parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ def SSE(model):
return expr


'''Adding pseudocode for draft implementation of the estimator class,
incorporating multistart.
'''
class Estimator(object):
"""
Parameter estimation class
Expand Down Expand Up @@ -273,8 +276,18 @@ def __init__(
tee=False,
diagnostic_mode=False,
solver_options=None,
# Add the extra arguments needed for running the multistart implement
# _validate_multistart_args:
# if n_restarts > 1 and theta_samplig_method is not None:
# n_restarts=1,
# theta_sampling_method="random",
):

'''first theta would be provided by the user in the initialization of
the Estimator class through the unknown parameter variables. Additional
would need to be generated using the sampling method provided by the user.
'''

# check that we have a (non-empty) list of experiments
assert isinstance(experiment_list, list)
self.exp_list = experiment_list
Expand All @@ -300,6 +313,10 @@ def __init__(
self.diagnostic_mode = diagnostic_mode
self.solver_options = solver_options

# add the extra multistart arguments to the Estimator class
# self.n_restarts = n_restarts
# self.theta_sampling_method = theta_sampling_method

# TODO: delete this when the deprecated interface is removed
self.pest_deprecated = None

Expand Down Expand Up @@ -447,6 +464,36 @@ def TotalCost_rule(model):
parmest_model = utils.convert_params_to_vars(model, theta_names, fix_vars=False)

return parmest_model

# Make new private method, _generalize_initial_theta:
# This method will be used to generalize the initial theta values for multistart
# optimization. It will take the theta names and the initial theta values
# and return a dictionary of theta names and their corresponding values.
# def _generalize_initial_theta(self, theta_names, initial_theta):
# if n_restarts == 1:
# # If only one restart, return an empty list
# return []

# return {theta_names[i]: initial_theta[i] for i in range(len(theta_names))}
# if self.method == "random":
# # Generate random theta values
# theta_vals = np.random.uniform(lower_bound, upper_bound, size=len(theta_names)
# else:
# # Generate theta values using Latin hypercube sampling or Sobol sampling
# samples

# elif self.method == "latin_hypercube":
# # Generate theta values using Latin hypercube sampling
# sampler = scipy.stats.qmc.LatinHypercube(d=len(theta_names))
# samples = sampler.random(n=self.n_restarts)
# theta_vals = np.array([lower_bound + (upper_bound - lower_bound) * theta for theta in samples])

# elif self.method == "sobol":
# sampler = scipy.stats.qmc.Sobol(d=len(theta_names))
# samples = sampler.random(n=self.n_restarts)
# theta_vals = np.array([lower_bound + (upper_bound - lower_bound) * theta for theta in samples])

# return theta_vals_multistart

def _instance_creation_callback(self, experiment_number=None, cb_data=None):
model = self._create_parmest_model(experiment_number)
Expand Down Expand Up @@ -921,6 +968,125 @@ def theta_est(
cov_n=cov_n,
)

'''
def theta_est_multistart(
self,
n_restarts=1,
theta_vals=None,
theta_sampling_method="random",
solver="ef_ipopt",
return_values=[],
calc_cov=False,
cov_n=None,
):
"""
Parameter estimation using multistart optimization

Parameters
----------
n_restarts: int, optional
Number of restarts for multistart. Default is 1.
theta_sampling_method: string, optional
Method used to sample theta values. Options are "random", "latin_hypercube", or "sobol".
Default is "random".
solver: string, optional
Currently only "ef_ipopt" is supported. Default is "ef_ipopt".
return_values: list, optional
List of Variable names, used to return values from the model for data reconciliation
calc_cov: boolean, optional
If True, calculate and return the covariance matrix (only for "ef_ipopt" solver).
Default is False.
cov_n: int, optional
If calc_cov=True, then the user needs to supply the number of datapoints
that are used in the objective function.

Returns
-------
objectiveval: float
The objective function value
thetavals: pd.Series
Estimated values for theta
variable values: pd.DataFrame
Variable values for each variable name in return_values (only for solver='ef_ipopt')
cov: pd.DataFrame
Covariance matrix of the fitted parameters (only for solver='ef_ipopt')
"""

# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return print(
"Multistart is not supported in the deprecated parmest interface")
)

assert isinstance(n_restarts, int)
assert isinstance(theta_sampling_method, str)
assert isinstance(solver, str)
assert isinstance(return_values, list)
assert isinstance(calc_cov, bool)
if calc_cov:
num_unknowns = max(
[
len(experiment.get_labeled_model().unknown_parameters)
for experiment in self.exp_list
]
)
assert isinstance(cov_n, int), (
"The number of datapoints that are used in the objective function is "
"required to calculate the covariance matrix"
)
assert (
cov_n > num_unknowns
), "The number of datapoints must be greater than the number of parameters to estimate"
if n_restarts > 1 and theta_sampling_method is not None:
call self._generalize_initial_theta(
self.estimator_theta_names, self.initial_theta
)
# make empty list to store results


theta_vals = self._generalize_initial_theta(
self.estimator_theta_names, self.initial_theta, self.n_restarts, theta_sampling_method
)


results = []
for i in range(n_restarts):
# for number of restarts, call the self._Q_opt method
# with the theta values generated using the _generalize_initial_theta method

# Call the _Q_opt method with the generated theta values
objectiveval, thetavals, variable_values, cov = self._Q_opt(
ThetaVals=theta_vals,
solver=solver,
return_values=return_values,
calc_cov=calc_cov,
cov_n=cov_n,
)
# Store the results in a list or DataFrame
# depending on the number of restarts
if n_restarts > 1 and cov is not None:
results.append(
{
"objectiveval": objectiveval,
"thetavals": thetavals,
"variable_values": variable_values,
"cov": cov,
}
elif n_restarts > 1 and cov is None:
results.append(
{ objectiveval: objectiveval,
"thetavals": thetavals,
"variable_values": variable_values,
}
)
return pd.DataFrame(results)
else:
return objectiveval, thetavals, variable_values, cov


)

'''
def theta_est_bootstrap(
self,
bootstrap_samples,
Expand Down