Skip to content

Commit

Permalink
Revise iterative Kafka and agent script
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Dec 8, 2023
1 parent f8e07d3 commit eaaeb62
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 136 deletions.
77 changes: 64 additions & 13 deletions scripts/_data_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,29 +347,38 @@ def _identify_multi_in_kafka(qepro_dic, metadata_dic, key_height=200, distance=1



def _fitting_in_kafka(x0, y0, data_id, peak, prop, dummy_test=False):
def _fitting_in_kafka(x0, y0, data_id, peak, prop, is_one_peak=True, dummy_test=False):
print(f'\n** Average of {data_id} has peaks at {peak}**\n')

print(f'\n** start to do peak fitting by Gaussian**\n')
if len(peak) == 1:

if is_one_peak:
f = _1gauss
M = max(prop['peak_heights'])
M_idx, _ = find_nearest(prop['peak_heights'], M)
peak = np.asarray([peak[M_idx]])
popt, _, x, y = _1peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True, dummy_test=dummy_test)
elif len(peak) == 2:
try:
f = _2gauss
popt, _, x, y = _2peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True)
except RuntimeError:

else:
if len(peak) == 1:
f = _1gauss
popt, _, x, y = _1peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True, dummy_test=dummy_test)
elif len(peak) == 2:
try:
f = _2gauss
popt, _, x, y = _2peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True)
except RuntimeError:
f = _1gauss
M = max(prop['peak_heights'])
M_idx, _ = find_nearest(prop['peak_heights'], M)
peak = np.asarray([peak[M_idx]])
popt, _, x, y = _1peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True, dummy_test=dummy_test)
else:
f = _1gauss
M = max(prop['peak_heights'])
M_idx, _ = find_nearest(prop['peak_heights'], M)
peak = np.asarray([peak[M_idx]])
popt, _, x, y = _1peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True, dummy_test=dummy_test)
else:
f = _1gauss
M = max(prop['peak_heights'])
M_idx, _ = find_nearest(prop['peak_heights'], M)
peak = np.asarray([peak[M_idx]])
popt, _, x, y = _1peak_fit_good_PL(x0, y0, f, peak=peak, raw_data=True, dummy_test=dummy_test)

shift, _ = find_nearest(x0, x[0])

Expand Down Expand Up @@ -415,6 +424,48 @@ def plqy_quinine(absorbance_sample, PL_integral_sample, refractive_index_solvent



### Functions doing line fitting for baseline correction / offset of absorption spectra
def line_2D(x, slope, y_intercept):
y = x*slope + y_intercept
return y

def fit_line_2D(x, y, fit_function, x_range=[600, 900], maxfev=10000, plot=False):
x = np.asarray(x)
y = np.asarray(y)
y = np.nan_to_num(y, nan=0)

try:
idx0, _ = find_nearest(x, x_range[0])
idx1, _ = find_nearest(x, x_range[1])
except (TypeError, IndexError):
idx0 = 0
idx1 = -1

slope = (y[idx1]-y[idx0]) / (x[idx1]-x[idx0])
y_intercept = np.mean(y[idx0:idx1])

try:
initial_guess = [slope, y_intercept]
except (TypeError, IndexError):
initial_guess = [0.01, 0]

try:
popt, pcov = curve_fit(fit_function, x[idx0:idx1], y[idx0:idx1], p0=initial_guess, maxfev=maxfev)
except RuntimeError:
maxfev=1000000
popt, pcov = curve_fit(fit_function, x[idx0:idx1], y[idx0:idx1], p0=initial_guess, maxfev=maxfev)

if plot:
plt.figure()
plt.plot(x, y, label='data')
plt.plot(x, fit_function(x, popt[0], popt[1]), label=f'y={popt[0]:.4f}x+{popt[1]:.4f}')
plt.legend()

return popt, pcov




######## Old versions of function #########

def _2peak_fit_PL3(x, y, distr='G', distance=30, height=930, plot=False, plot_title=None, second_peak=None, maxfev=100000):
Expand Down
17 changes: 14 additions & 3 deletions scripts/_data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name='primar
if stream_name == 'primary':
fout = f'{csv_path}/{sample_type}_PL_{date}-{time}_{full_uid[0:8]}_fitted.csv'

elif stream_name == 'fluorescence':
elif stream_name == 'fluorescence' or stream_name == 'absorbance':
new_dir = f'{csv_path}/{date}{time}_{full_uid[0:8]}_{stream_name}'
os.makedirs(new_dir, exist_ok=True)
fout = f'{new_dir}/{sample_type}_{date}-{time}_{full_uid[0:8]}_fitted.csv'
Expand Down Expand Up @@ -371,9 +371,20 @@ def dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name='primar
except (TypeError, KeyError):
pass

fp.write('Wavelength,Dark,Sample,Fluorescence_mean,Fitting\n')
# fp.write('Wavelength,Dark,Sample,Fluorescence_mean,Fitting\n')
# for i in range(x_axis_data.shape[1]):
# fp.write(f'{x_axis_data[-1,i]},{dark_data[-1,i]},{sample_data[-1,i]},{output_mean[i]},{fitted_y[i]}\n')

if spectrum_type[0] == 3:
fp.write('Wavelength,Dark,Reference,Sample,Absorbance_mean,Offset\n')
else:
fp.write('Wavelength,Dark,Sample,Fluorescence_mean,Fitting\n')

for i in range(x_axis_data.shape[1]):
fp.write(f'{x_axis_data[-1,i]},{dark_data[-1,i]},{sample_data[-1,i]},{output_mean[i]},{fitted_y[i]}\n')
if spectrum_type[0] == 3:
fp.write(f'{x_axis_data[-1,i]},{dark_data[-1,i]},{reference_data[-1,i]},{sample_data[-1,i]},{output_mean[i]},{output_mean[i]-fitted_y[i]}\n')
else:
fp.write(f'{x_axis_data[-1,i]},{dark_data[-1,i]},{sample_data[-1,i]},{output_mean[i]},{fitted_y[i]}\n')



Expand Down
30 changes: 28 additions & 2 deletions scripts/_plot_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ def plot_average_good(self, x, y, color=None, label=None, clf_limit=10):
except (IndexError):
f = plt.figure(self.fig[-1])

ax = f.gca()

ax = f.gca()
if len(list(ax.lines)) > clf_limit:
plt.clf()

ax = f.gca()

if label == None:
label = self.time + '_' + self.uid[:8]

Expand All @@ -155,6 +156,31 @@ def plot_average_good(self, x, y, color=None, label=None, clf_limit=10):
ax.legend()
f.canvas.manager.show()
f.canvas.flush_events()



def plot_offfset(self, x, fit_function, popt):

# import palettable.colorbrewer.diverging as pld
# palette = pld.RdYlGn_4_r
# cmap = palette.mpl_colormap

try:
f = plt.figure(self.fig[2])
except (IndexError):
f = plt.figure('bundle_absorbance')

ax = f.gca()

ax.plot(x, fit_function(x, *popt), label=f'check baseline: {fit_function.__name__}\ny={popt[0]:.4f}x+{popt[1]:.4f}')

# # ax.set_facecolor((0.95, 0.95, 0.95))
# ax.set_xlabel('Wavelength (nm)', fontdict={'size': 14})
# ax.set_ylabel(y_label, fontdict={'size': 14})
# # ax.set_title(f'{self.date}-{self.time}_{self.uid[0:8]}_{self.stream_name}_{fit_function.__name__}')
ax.legend()
f.canvas.manager.show()
f.canvas.flush_events()



Expand Down
82 changes: 41 additions & 41 deletions scripts/kafka_consumer_iterate_qserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,46 +92,48 @@

from bloptools.bayesian import Agent, DOF, Objective

agent_data_path = '/home/xf28id2/data_ZnI2'

dofs = [
DOF(description="CsPb(oleate)3", name="infusion_rate_1", limits=(10, 170)),
DOF(description="TOABr", name="infusion_rate_2", limits=(10, 170)),
# DOF(description="ZnI2", name="infusion_rate_3", limits=(10, 170)),
DOF(description="ZnI2", name="infusion_rate_3", limits=(8, 120)),
]

objectives = [
Objective(description="Peak emission", name="Peak", target=516, min_snr=2, weight=1e-1),
Objective(description="Peak width", name="FWHM", target="min", min_snr=2, weight=1, log=True),
Objective(description="Quantum yield", name="PLQY", target="max", min_snr=2, weight=1, log=True),
Objective(description="Peak emission", name="Peak", target=650, weight=10, min_snr=2),
Objective(description="Peak width", name="FWHM", target="min", weight=1, min_snr=2),
Objective(description="Quantum yield", name="PLQY", target="max", weight=1e2, min_snr=2),
]


USE_AGENT = True
agent_iterate = True

agent = Agent(dofs=dofs, objectives=objectives, db=None, verbose=True)
if USE_AGENT:
agent = Agent(dofs=dofs, objectives=objectives, db=None, verbose=True)
#agent.load_data("~/blop/data/init.h5")

metadata_keys = ["time", "uid", "r_2"]
metadata_keys = ["time", "uid", "r_2"]

filepaths = glob.glob("/home/xf28id2/data/*.json")
for fp in np.array(filepaths):
with open(fp, "r") as f:
data = json.load(f)
filepaths = glob.glob(f"{agent_data_path}/*.json")
for fp in np.array(filepaths):
with open(fp, "r") as f:
data = json.load(f)

if not data.get("r_2", 0) > 0.9:
continue

x = {k:[data[k]] for k in agent.dofs.names}
y = {k:[data[k]] for k in agent.objectives.names}
metadata = {k:[data.get(k, None)] for k in metadata_keys}
agent.tell(x=x, y=y, metadata=metadata)
x = {k:[data[k]] for k in agent.dofs.names}
y = {k:[data[k]] for k in agent.objectives.names}
metadata = {k:[data.get(k, None)] for k in metadata_keys}
agent.tell(x=x, y=y, metadata=metadata)

agent._construct_models()
agent._construct_models()


def print_kafka_messages(beamline_acronym, csv_path=csv_path,
key_height=key_height, height=height, distance=distance,
pump_list=pump_list, sample=sample, precursor_list=precursor_list,
mixer=mixer, dummy_test=dummy_kafka, plqy=PLQY, prefix=prefix):
mixer=mixer, dummy_test=dummy_kafka, plqy=PLQY, prefix=prefix,
agent_data_path=agent_data_path):

print(f"Listening for Kafka messages for {beamline_acronym}")
print(f'Defaul parameters:\n'
Expand Down Expand Up @@ -248,21 +250,21 @@ def print_message(consumer, doctype, doc,
x0, y0, data_id, peak, prop = da._identify_one_in_kafka(qepro_dic, metadata_dic, key_height=kh, distance=dis, height=hei, dummy_test=dummy_test)


## Get absorbance value at 365 nm
## Apply an offset to zero baseline of absorption spectra
elif stream_name == 'absorbance':
print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n')
abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0)
wavelength = qepro_dic['QEPro_x_axis'][0]
idx_365, _ = da.find_nearest(wavelength, PLQY[2])
absorbance_365 = abs_array[idx_365]
# ## break for loop, clear good_data, and let steam_name = 'primary' if absoebance_3655 < 0
# if absorbance_365 < 0:
# bad_data.clear()
# good_data.clear()
# new_points = agent.ask("qei", n=1)
# stream_name = 'fluorescence'
# break

popt_abs, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[700, 900], plot=False)
abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs)

print(f'\nFitting function for baseline offset: {da.line_2D}\n')
ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs}
de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff_abs)
u.plot_offfset(wavelength, da.line_2D, popt_abs)
print(f'\n** export offset results of absorption spectra complete**\n')


## Avergae scans in 'fluorescence' and idenfify good/bad
elif stream_name == 'fluorescence':
Expand All @@ -284,7 +286,6 @@ def print_message(consumer, doctype, doc,
if (type(peak) is np.ndarray) and (type(prop) is dict):
x, y, p, f_fit, popt = da._fitting_in_kafka(x0, y0, data_id, peak, prop, dummy_test=dummy_test)


fitted_y = f_fit(x, *popt)
r_2 = da.r_square(x, y, fitted_y)

Expand Down Expand Up @@ -312,19 +313,18 @@ def print_message(consumer, doctype, doc,
PL_integral_s = integrate.simpson(y,x)

## Find absorbance at 365 nm from absorbance stream
q_dic, m_dic = de.read_qepro_by_stream(uid, stream_name='absorbance', data_agent='tiled')
abs_array = q_dic['QEPro_output'][1:].mean(axis=0)
wavelength = q_dic['QEPro_x_axis'][0]
# q_dic, m_dic = de.read_qepro_by_stream(uid, stream_name='absorbance', data_agent='tiled')
# abs_array = q_dic['QEPro_output'][1:].mean(axis=0)
# wavelength = q_dic['QEPro_x_axis'][0]

idx1, _ = da.find_nearest(wavelength, PLQY[2])
absorbance_s = abs_array[idx1]
absorbance_s = abs_array_offset[idx1]

if PLQY[1] == 'fluorescein':
plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *PLQY[3:])
else:
plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *PLQY[3:])

if absorbance_s < 0:
plqy = 0.000001

plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy}

Expand All @@ -333,7 +333,7 @@ def print_message(consumer, doctype, doc,
## Unify the unit of infuse rate as 'ul/min'
ruc_0 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][0], r1 = 'ul/min')
ruc_1 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][1], r1 = 'ul/min')
# ruc_2 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][2], r1 = 'ul/min')
ruc_2 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][2], r1 = 'ul/min')

# data_for_agent = {'infusion_rate_1': metadata_dic["infuse_rate"][0]*ruc_0,
# 'infusion_rate_2': metadata_dic["infuse_rate"][1]*ruc_1,
Expand All @@ -347,12 +347,12 @@ def print_message(consumer, doctype, doc,

agent_data["infusion_rate_1"] = metadata_dic["infuse_rate"][0]*ruc_0
agent_data["infusion_rate_2"] = metadata_dic["infuse_rate"][1]*ruc_1
# agent_data["infusion_rate_3"] = metadata_dic["infuse_rate"][2]*ruc_2
agent_data["infusion_rate_3"] = metadata_dic["infuse_rate"][2]*ruc_2

with open(f"/home/xf28id2/data/{data_id}.json", "w") as f:
with open(f"{agent_data_path}/{data_id}.json", "w") as f:
json.dump(agent_data, f)

print("\nwrote to ~/data")
print(f"\nwrote to {agent_data_path}")


### Three parameters for ML: peak_emission, fwhm, plqy
Expand Down
Loading

0 comments on commit eaaeb62

Please sign in to comment.