Skip to content
133 changes: 85 additions & 48 deletions nuc_morph_analysis/analyses/volume/add_growth_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import matplotlib.pyplot as plt
import seaborn as sb
from scipy.optimize import curve_fit
from nuc_morph_analysis.lib.preprocessing.curve_fitting import powerfunc
from nuc_morph_analysis.lib.preprocessing.curve_fitting import powerfunc, line, exponential
from nuc_morph_analysis.lib.visualization.notebook_tools import save_and_show_plot
from nuc_morph_analysis.lib.visualization.plotting_tools import get_plot_labels_for_metric

Expand Down Expand Up @@ -52,22 +52,24 @@ def add_early_growth_rate(df, interval, flag_dropna=False):
return df


def fit_tracks_to_time_powerlaw(
def fit_tracks_to_model(
df,
feature_col,
interval,
model="power",
plot=False,
):
"""
This function fits the volume of each track to either a power law, exponential or
linear model and adds the fit parameters and RMSE of the fit to the input dataframe.

Parameters
----------
df : DataFrame
full tracks dataframe
feature_col: str
Name of column with feature to fit
interval : float
The time interval in minutes
model : str, optional
The model to fit to. The default is "power". Other options are "exponential" and "linear".
plot : bool, optional
If True, a plot of the volume vs time for each track and its exponential fit is displayed. The default is False.

Expand All @@ -77,31 +79,33 @@ def fit_tracks_to_time_powerlaw(
The input dataframe with an additional columns with linearity fit parameters added
"""

# get parameters based on fitting volume or SA
if feature_col == "volume":
short = "volume"
atB_0 = 550
# get parameters based on fitting volume
atB_0 = 550
t_scale0 = np.nan
model_name = model
if model == "power":
rate_0 = 35
tscale_0 = 1
elif feature_col == "mesh_sa":
short = "SA"
atB_0 = 400
rate_0 = 15
tscale_0 = 1
model_name = "linearity"
modelfunc = powerfunc
elif model == "exponential":
rate_0 = 0.05
modelfunc = exponential
elif model == "linear":
rate_0 = 50
modelfunc = line
else:
raise ValueError(
"Function currently only designed to work \
with volume and mesh_sa.\
To fit antoher column, update this function."
"Function currently only fits volumes to power law, exponential or linear model."
)
trackdir = f"volume/figures/linearity/{short}/tracks/"
yscale, ylabel, yunits, _ = get_plot_labels_for_metric(feature_col)

yscale, ylabel, yunits, _ = get_plot_labels_for_metric("volume")

features = [
f"tscale_linearityfit_{short}",
f"atB_linearityfit_{short}",
f"rate_linearityfit_{short}",
f"RMSE_linearityfit_{short}",
f"tscale_{model_name}fit_volume",
f"atB_{model_name}fit_volume",
f"rate_{model_name}fit_volume",
f"RMSE_{model_name}fit_volume",
]
for feature in features:
df[feature] = np.nan
Expand All @@ -121,21 +125,25 @@ def fit_tracks_to_time_powerlaw(
# get trimmed track times and volumes
x = df_track_trim["index_sequence"].values * interval / 60
x -= x[0]
y = df_track_trim[feature_col].values * yscale
y = df_track_trim["volume"].values * yscale

# fit trimmed track to model with initial guesses
popt, _ = curve_fit(powerfunc, x, y, [rate_0, atB_0, tscale_0])

if model == "power":
popt, _ = curve_fit(modelfunc, x, y, [rate_0, atB_0, tscale_0])
tscale = popt[2]
else:
popt, _ = curve_fit(modelfunc, x, y, [rate_0, atB_0])
tscale = np.nan

# get fits parameters, residuals and mse
z = powerfunc(x, *popt)
res = z - y
rmse = np.sqrt(np.nanmean(res**2))
rate = popt[0]
atB = popt[1]
tscale = popt[2]

z = modelfunc(x, *popt)
res = z - y
rmse = np.sqrt(np.nanmean(res**2))

# plot real track and fitted model
if plot is True:
if plot is True and model == "power":
plt.plot(x, y, label=f"Track ID: {track}", c="grey", alpha=0.5)
plt.plot(
x,
Expand All @@ -149,26 +157,27 @@ def fit_tracks_to_time_powerlaw(
plt.ylabel(f"{ylabel} {yunits}")
plt.xlabel("Time (hr)")
plt.tight_layout()
trackdir = f"volume/figures/linearity/volume/tracks/"
save_and_show_plot(f"{trackdir}/track{track}", quiet=True)
plt.close()

# add fit parameters and error to manifest
df.loc[df_track.index, f"tscale_linearityfit_{short}"] = tscale
df.loc[df_track.index, f"atB_linearityfit_{short}"] = atB
df.loc[df_track.index, f"rate_linearityfit_{short}"] = rate
df.loc[df_track.index, f"RMSE_linearityfit_{short}"] = rmse
df.loc[df_track.index, f"tscale_{model_name}fit_volume"] = tscale
df.loc[df_track.index, f"atB_{model_name}fit_volume"] = atB
df.loc[df_track.index, f"rate_{model_name}fit_volume"] = rate
df.loc[df_track.index, f"RMSE_{model_name}fit_volume"] = rmse

except Exception:
fail_count += 1

if fail_count > 0:
print(f"Failed {short} linearity fit count: {fail_count}")
print(f"Failed volume {model} model fit count: {fail_count}")

return df


def plot_fit_parameter_distribution(
df_all, figdir, feature, by_colony_flag=False, density_flag=True, nbins=20
df_all, figdir, feature, by_colony_flag=False, density_flag=True, allmodels_flag=False, nbins=20
):
"""
This function plots the mean volume fold change for
Expand All @@ -181,10 +190,14 @@ def plot_fit_parameter_distribution(
Dataframe containing full-track data from all colony datasets
figdir: path
Path to directory to save all figures for this script
feature: str
Name of feature to plot
by_colony_flag: bool
Flag for whether to separate data out by colony
density_flag: bool
Flag for whether to use density instead of histogram
allmodels_flag: bool
Flag for whether to plot all three volume model errors (power law, exponential, linear)
nbins: int
Number of bins for histogram
"""
Expand Down Expand Up @@ -212,31 +225,55 @@ def plot_fit_parameter_distribution(

_, ax = plt.subplots(1, 1, figsize=(5, 4), dpi=300)

for color, colony, label, ind in zip(colors, colonies, labels, [0, 2, 4, 6]):
if not allmodels_flag:

for color, colony, label, ind in zip(colors, colonies, labels, [0, 2, 4, 6]):

if by_colony_flag and colony != "all_baseline":
df = df_all[df_all["colony"] == colony]
else:
df = df_all

if density_flag:
# get kde and find feature value giving max to add to legend
sb.kdeplot(df[feature], color=color, ax=ax)
x = ax.lines[ind].get_xdata() # Get the x data of the distribution
y = ax.lines[ind].get_ydata() # Get the y data of the distribution
maxid = np.argmax(y) # The id of the peak (maximum of y data)
label += f" ({np.round(x[maxid],2)})"
# replot kde now with complete label
sb.kdeplot(df[feature], color=color, label=label, ax=ax)

else:
plt.hist(df[feature], histtype="step", bins=nbins, color=color, label=label)

else:

for color, model, model_name, ind in zip(
["k", "r", "b"],
["linearity", "linear", "exponential"],
["Power law", "Linear", "Exponential"],
[0, 2, 4]):

if by_colony_flag and colony != "all_baseline":
df = df_all[df_all["colony"] == colony]
else:
df = df_all

if density_flag:
# get kde and find feature value giving max to add to legend
feature = "RMSE_" + model + "fit_volume"
sb.kdeplot(df[feature], color=color, ax=ax)
x = ax.lines[ind].get_xdata() # Get the x data of the distribution
y = ax.lines[ind].get_ydata() # Get the y data of the distribution
maxid = np.argmax(y) # The id of the peak (maximum of y data)
label += f" ({np.round(x[maxid],2)})"
label = f"{model_name} ({np.round(x[maxid],2)})"
# replot kde now with complete label
sb.kdeplot(df[feature], color=color, label=label, ax=ax)

else:
plt.hist(df[feature], histtype="step", bins=nbins, color=color, label=label)

if "tscale" in feature:
plt.axvline(1, color="k", linestyle=":")
plt.xlim(0, 3)
if "RMSE" in feature:
plt.axvline(5.49, color="k", linestyle=":", label=f"Error (5.49 {xunits[1:-1]})")

plt.axvline(5.54, color="k", linestyle=":", label=f"Segmnetation error (5.54 {xunits[1:-1]})")
plt.xlim(0, 50)
plt.xlabel(f"{xlabel} {xunits}")
plt.ylabel(ylabel)
plt.legend(prop={"size": 12})
Expand Down
30 changes: 20 additions & 10 deletions nuc_morph_analysis/analyses/volume/local_growth_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,27 @@
plt.close()


# %% Plot distribution of tscale fitted values and root mean squared error
# %% Plot distribution of tscale fitted values
# both pooled for all baseline colonies and separated by colony
for feature in ["tscale_linearityfit_volume", "RMSE_linearityfit_volume"]:
for by_colony_flag in [True, False]:
plot_fit_parameter_distribution(
df_track_level_features,
figdir,
feature,
by_colony_flag=by_colony_flag,
density_flag=True,
)
for by_colony_flag in [True, False]:
plot_fit_parameter_distribution(
df_track_level_features,
figdir,
"tscale_linearityfit_volume",
by_colony_flag=by_colony_flag,
density_flag=True,
allmodels_flag=False,
)

# Plot distribution of RMSE values for all fits
plot_fit_parameter_distribution(
df_track_level_features,
figdir,
"RMSE",
by_colony_flag=by_colony_flag,
density_flag=True,
allmodels_flag=True,
)

# %% plot relationships of alpha to fold change and colony time for all data pooled
for feature in ["volume_fold_change_BC", "colony_time_at_B"]:
Expand Down
24 changes: 23 additions & 1 deletion nuc_morph_analysis/lib/preprocessing/curve_fitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,32 @@ def line(x, a, b):
return a * x + b


def exponential(x, a, b):
"""
This function gives the y values of an exponetial
of the form y = b * e ^ (x * a)

Parameters
----------
x: Number or ndarray
The x values
a: Number
Scaling of exponent with time
b: Number
y-intercept of the line

Returns
-------
Number or ndarray
y = b * e ^ (x * a)
"""
return b * np.exp(x * a)


def powerfunc(x, a, b, c):
"""
This function gives the y values of power law
of the form y = a * x + b
of the form y = a * (x^c) + b

Parameters
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ def process_full_tracks(df_all, thresh, pix_size, interval):
df_full = add_features.add_fold_change_track_fromB(df_full, "SA", "mesh_sa", pix_size**2)
df_full = add_growth_features.add_early_growth_rate(df_full, interval)
df_full = add_growth_features.add_late_growth_rate_by_endpoints(df_full)
df_full = add_growth_features.fit_tracks_to_time_powerlaw(df_full, "volume", interval)
df_full = add_growth_features.fit_tracks_to_model(df_full, interval, "power")
df_full = add_growth_features.fit_tracks_to_model(df_full, interval, "exponential")
df_full = add_growth_features.fit_tracks_to_model(df_full, interval, "linear")

# For LRM
df_full = add_features.add_lineage_features(df_full, feature_list=['volume_at_B', 'duration_BC', 'volume_at_C', 'delta_volume_BC'])
Expand Down