diff --git a/demo/APO Sample Existing Demo.ipynb b/demo/APO Sample Existing Demo.ipynb new file mode 100644 index 0000000..8e9df98 --- /dev/null +++ b/demo/APO Sample Existing Demo.ipynb @@ -0,0 +1,184 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "75f106cc", + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.insert(0, '../')\n", + "\n", + "print(sys.path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "117a382f", + "metadata": {}, + "outputs": [], + "source": [ + "import obsidian\n", + "import pandas as pd\n", + "import numpy as np\n", + "print(f'obsidian version: ' + obsidian.__version__)\n", + "\n", + "from obsidian.experiment import AdvExpDesigner\n", + "from obsidian.experiment.sampling import sample_with_bias, best_sample" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1cd0a6b0", + "metadata": {}, + "outputs": [], + "source": [ + "#generate random data for this demo\n", + "np.random.seed(42)\n", + "\n", + "n = 1000\n", + "demo_data = pd.DataFrame({\n", + " 'reagent_conc': np.round(np.random.uniform(0.1, 1.0, n), 2),\n", + " 'ionic_strength': np.round(np.random.uniform(10, 100, n), 2),\n", + " 'surfactant_conc': np.round(np.random.uniform(0.01, 0.2, n), 3),\n", + " 'compound_A': np.round(np.random.uniform(0, 50, n), 2),\n", + " 'compound_B': np.round(np.random.uniform(0, 50, n), 2),\n", + " 'sugar': np.random.choice(['glucose', 'fructose', 'sucrose'], n),\n", + " 'surfactant': np.random.choice(['SDS', 'Tween20', 'TritonX'], n),\n", + " 'buffer': np.random.choice(['PBS', 'Tris', 'HEPES'], n),\n", + " 'pH': np.round(np.random.uniform(5.5, 8.5, n), 2)\n", + "})\n", + "\n", + "demo_data.index.name = 'FormulationID'\n", + "demo_data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "57ed0226", + "metadata": {}, + "outputs": [], + "source": [ + "#Initialize existing experimental data as an AdvExpDesigner object\n", + "designer = AdvExpDesigner(design_df=demo_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9feae24b", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "You can sample an existing dataset with or without bias: \n", + "Bias dictionary format : {\"column\": [lower_bound, upper_bound, relative_weight]}\n", + "\n", + "- Weight >1 increases sampling probability for in-range rows.\n", + "- Weight <1 decreases it.\n", + "- Weight = 0 excludes those rows entirely.\n", + "\"\"\"\n", + "\n", + "bias = {\n", + " \"ionic_strength\": [50, 60, 3.0], \n", + "}\n", + "\n", + "seed = np.random.randint(0,1000)\n", + "print(f\"Random seed for reproducibility: {seed}\")\n", + "\n", + "#We can easily create a random sample of n samples with weights using built in Pandas functions\n", + "#enforce = True allows you to force the boundary to be true ; resultant sample may not be space-filling.\n", + "sample = sample_with_bias(designer.design, n=1000, replace=False, seed=seed, bias=bias, plot_weights=True, enforce=False)\n", + "\n", + "sample" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ab457ed8", + "metadata": {}, + "outputs": [], + "source": [ + "#One-hot encode your categorical columns for easy handling in determining Euclidean distance\n", + "df_encoded = pd.get_dummies(designer.design, columns=[\"sugar\", \"surfactant\", \"buffer\"], dtype=int) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a46bcd0", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "perform random sampling n_trial times, select the best one via criteria metric:\n", + "metric:\n", + " - \"maximin\": maximize the minimum pairwise Euclidean distance\n", + " - \"mean_nn\": maximize the mean nearest-neighbor Euclidean distance\n", + " - \"hybrid\": 0.6*maximin + 0.4*mean_nn \n", + "\"\"\"\n", + "seed = np.random.randint(0,1000)\n", + "print(f\"Random seed for reproducibility: {seed}\")\n", + "\n", + "optimal_sample, info = best_sample(\n", + " df_encoded, 10, feature_cols=df_encoded.columns, n_trials=1000,\n", + " bias=bias, plot_weights=True, enforce=False, random_state=seed, metric=\"hybrid\"\n", + ")\n", + "\n", + "print(info)\n", + "optimal_sample\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3ea1bcd8", + "metadata": {}, + "outputs": [], + "source": [ + "#decode from one-hot encoding\n", + "normal_cols = list(optimal_sample.columns)[0:6]\n", + "encoded_cols = list(optimal_sample.columns)[6:]\n", + "decoded = pd.from_dummies(optimal_sample[encoded_cols],sep=\"_\")\n", + "optimal_design_decoded = pd.concat([optimal_sample[normal_cols], decoded], axis=1)\n", + "optimal_design_decoded" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2137e315", + "metadata": {}, + "outputs": [], + "source": [ + "print(designer.plot_histograms(optimal_design_decoded))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv (3.13.5)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/obsidian/experiment/advanced_design.py b/obsidian/experiment/advanced_design.py index e7594af..e5e60bd 100644 --- a/obsidian/experiment/advanced_design.py +++ b/obsidian/experiment/advanced_design.py @@ -22,7 +22,7 @@ class AdvExpDesigner: """ def __init__( - self, continuous_params, conditional_subparameters, subparam_mapping=None + self, continuous_params=None, conditional_subparameters=None, subparam_mapping=None, design_df=None ): """ Initializes the AdvExpDesigner with experimental parameters and optional subparameter mappings. @@ -30,17 +30,24 @@ def __init__( :param continuous_params: A dictionary containing the continuous parameters for the design. :param conditional_subparameters: A dictionary containing the conditional subparameters for the design. :param subparam_mapping: A dictionary for mapping, will be inferred if not provided. + :param design_df: A Pandas DataFrame of an existing experimental design, default None """ - self.continuous_params = continuous_params - self.conditional_subparameters = conditional_subparameters - self.subparam_mapping = subparam_mapping or infer_subparam_mapping( - self.conditional_subparameters - ) - self.continuous_keys = list(self.continuous_params.keys()) - self.categorical_keys = list(self.conditional_subparameters.keys()) - self.subparam_key = ( - list(self.subparam_mapping.values())[0] if self.subparam_mapping else None - ) + self.continuous_params = continuous_params if continuous_params else {} + self.conditional_subparameters = conditional_subparameters if conditional_subparameters else {} + + if design_df is not None and not design_df.empty: + self.design = design_df + self.categorical_keys = design_df.select_dtypes(exclude=['number']).columns.tolist() + if continuous_params: + self.continuous_keys = list(self.continuous_params.keys()) + else: + self.continuous_keys = design_df.select_dtypes(include=['number']).columns.tolist() + else: + self.continuous_keys = list(self.continuous_params.keys()) if continuous_params else [] + self.categorical_keys = list(self.conditional_subparameters.keys()) if conditional_subparameters else [] + + self.subparam_mapping = subparam_mapping or infer_subparam_mapping(self.conditional_subparameters) + self.subparam_key = (list(self.subparam_mapping.values())[0] if self.subparam_mapping else None) def generate_design(self, seed, n_samples, optimize_categories=True): """ @@ -426,13 +433,16 @@ def assign_conditional_subparameter( def infer_subparam_mapping(conditional_subparameters): mapping = {} - for cat_param, levels in conditional_subparameters.items(): - subparam_candidates = set() - for level_info in levels.values(): - subparams = [k for k in level_info if k != "freq"] - subparam_candidates.update(subparams) - if len(subparam_candidates) == 1: - mapping[cat_param] = subparam_candidates.pop() + if len(conditional_subparameters) == 0: + return mapping + else: + for cat_param, levels in conditional_subparameters.items(): + subparam_candidates = set() + for level_info in levels.values(): + subparams = [k for k in level_info if k != "freq"] + subparam_candidates.update(subparams) + if len(subparam_candidates) == 1: + mapping[cat_param] = subparam_candidates.pop() return mapping @@ -915,14 +925,7 @@ def plot_design_quality_evolution(metrics_df): metrics_df = metrics_df.sort_values("seed") fig, axes = plt.subplots(2, 3, figsize=(15, 10)) - metrics = [ - "D-optimality", - "A-optimality", - "Pairwise Distance CV", - "Max Continuous Corr", - "Max Categorical Corr", - "score", - ] + metrics = metrics_df.columns for i, metric in enumerate(metrics): ax = axes[i // 3, i % 3] diff --git a/obsidian/experiment/sampling.py b/obsidian/experiment/sampling.py new file mode 100644 index 0000000..be4a48a --- /dev/null +++ b/obsidian/experiment/sampling.py @@ -0,0 +1,144 @@ +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + + +def generate_weights(df, n, bias, plot_weights=False, enforce=False): + """ + Generates a Pandas series of weights for each datum given a particular bias. + + df: DataFrame of candidates + n: size of the design to pick + bias: dictionary of biases in the format : {"column": [lower_bound, upper_bound, relative_weight]} + - Weight >1 increases sampling probability for in-range rows. + - Weight <1 decreases it. + - Weight = 0 excludes those rows entirely. + plot_weights: boolean, whether to plot distribution of weights, default False + enforce: boolean, whether to force biases, default False + + Returns: Pandas Series of normalized row weights. + """ + weights = pd.Series(1.0, index=df.index) + for col, params in bias.items(): + lower, upper = params[0], params[1] + weight = params[2] if len(params) > 2 else 1.0 + mask = df[col].between(lower, upper, inclusive="both") + if enforce: + weights *= mask.astype(float) * weight + else: + weights *= mask.astype(float) * weight + (~mask).astype(float) * 1.0 + if enforce: + if (weights > 0).sum() < n: + raise ValueError(f"Not enough rows ({(weights > 0).sum()}) satisfy all enforce conditions for n={n}.") + + weights = weights / weights.sum() + + print("Weights min:", weights.min(), "max:", weights.max()) + + if plot_weights: + plt.figure(figsize=(8, 4)) + plt.hist(weights, bins=50) + plt.title("Distribution of Sampling Weights") + plt.xlabel("Weight") + plt.ylabel("Count") + plt.show() + + return weights + + +def sample_with_bias(df, n, replace=False, seed=None, bias=None, enforce=False, plot_weights=False): + """ + Returns a random Pandas DataFrame sample of data points from a population with or without bias. + + df: DataFrame of candidates + n: int, size of the design to pick + replace: boolean, allow or disallow sampling from the same row more than once, default False + bias: dictionary of biases in the format : {"column": [lower_bound, upper_bound, relative_weight]}, default None + - Weight >1 increases sampling probability for in-range rows. + - Weight <1 decreases it. + - Weight = 0 excludes those rows entirely. + enforce: boolean, whether to force biases, default False + plot_weights: boolean, whether to plot distribution of weights, default False + + Returns: Pandas DataFrame of sampled data points. + """ + if bias: + w = generate_weights(df, n, bias, plot_weights, enforce) + return df.sample(n=n, replace=replace, random_state=seed, weights=w) + else: + return df.sample(n=n, replace=replace, random_state=seed) + + +def _space_filling_score(Z, metric="hybrid"): + """ + Z: (k, d) standardized features of the candidate sample + metric: + - "maximin": maximize the minimum pairwise distance + - "mean_nn": maximize the mean nearest-neighbor distance + - "hybrid": 0.6*maximin + 0.4*mean_nn (more stable in practice) + """ + k = Z.shape[0] + D = np.sqrt(((Z[:, None, :] - Z[None, :, :])**2).sum(-1)) + np.fill_diagonal(D, np.inf) + d_min = D[np.triu_indices(k, 1)].min() + d_mnn = D.min(axis=1).mean() + if metric == "maximin": + return d_min + if metric == "mean_nn": + return d_mnn + if metric == "hybrid": + return 0.6 * d_min + 0.4 * d_mnn + raise ValueError("Unknown metric") + + +def best_sample(df, k, feature_cols, *, n_trials=500, bias=None, plot_weights=False, enforce=False, + random_state=None, standardize=True, dropna=True, metric="hybrid"): + """ + Repeats random sampling n_trials times and returns the most space-filling sample. + + df: DataFrame of candidates + k: size of the design to pick + feature_cols: columns that define “space” (numeric; one-hot encode cats if needed) + weights: None | column name | arraylike | Series aligned to df.index + (use this to bias, e.g., pH in [6, 6.5] heavier) + """ + base = df[feature_cols] + idx = base.dropna().index if dropna else base.index + dfv = df.loc[idx] + Xfull = base.loc[idx].to_numpy(dtype=float) + + if bias: + weights = generate_weights(df, k, bias, plot_weights, enforce) + else: + weights = None + + # standardize once using the FULL population (not per-trial) for fair geometry + if standardize: + mu = Xfull.mean(axis=0) + sig = Xfull.std(axis=0) + sig[sig == 0] = 1.0 + def toZ(X): return (X - mu) / sig + else: + def toZ(X): return X + + # prep weights aligned to the filtered df + w = None + if weights is not None: + if isinstance(weights, str): + w = dfv[weights] + else: + w = pd.Series(weights, index=df.index).reindex(dfv.index).fillna(0.0) + + rng = np.random.default_rng(random_state) # reproducible stream + best_df = None + best_score = -np.inf + + for _ in range(n_trials): + cand = dfv.sample(n=k, replace=False, weights=w, random_state=rng) + Z = toZ(cand[feature_cols].to_numpy(dtype=float)) + s = _space_filling_score(Z, metric=metric) + if s > best_score: + best_score = s + best_df = cand + + return best_df, {"score": best_score, "metric": metric, "n_trials": n_trials}