From 908a12fe04359a01d248664bb44754cd929571bb Mon Sep 17 00:00:00 2001 From: XPD Operator Date: Mon, 26 Feb 2024 13:58:23 -0500 Subject: [PATCH] Update kafka for publisher --- scripts/kafka_consumer_publisher.py | 385 ++++++++++++++++++ scripts/{ => notes}/DEBUSSY_plot_01.ipynb | 0 scripts/{ => notes}/qserver_zmq_001.ipynb | 0 scripts/{ => notes}/read_db_catalog.ipynb | 0 scripts/prepare_agent.py | 2 +- scripts/{ => tests}/kafka_consumer_single.py | 0 .../kafka_consumer_with_qserver.py | 0 .../kafka_consumer_with_qserver_dummy.py | 0 scripts/{ => tests}/print_kafka_with_main.py | 0 scripts/{ => tests}/print_kafka_with_main2.py | 0 scripts/{ => tests}/print_kafka_with_main3.py | 0 .../print_kafka_with_main_consumer.py | 0 scripts/{ => tests}/queue_test_001.py | 0 scripts/{ => tests}/queue_test_002.py | 0 scripts/{ => tests}/queue_test_002_ex.py | 0 scripts/{ => tests}/queue_test_003.py | 0 scripts/{ => tests}/queue_test_004.py | 0 17 files changed, 386 insertions(+), 1 deletion(-) create mode 100644 scripts/kafka_consumer_publisher.py rename scripts/{ => notes}/DEBUSSY_plot_01.ipynb (100%) rename scripts/{ => notes}/qserver_zmq_001.ipynb (100%) rename scripts/{ => notes}/read_db_catalog.ipynb (100%) rename scripts/{ => tests}/kafka_consumer_single.py (100%) rename scripts/{ => tests}/kafka_consumer_with_qserver.py (100%) rename scripts/{ => tests}/kafka_consumer_with_qserver_dummy.py (100%) rename scripts/{ => tests}/print_kafka_with_main.py (100%) rename scripts/{ => tests}/print_kafka_with_main2.py (100%) rename scripts/{ => tests}/print_kafka_with_main3.py (100%) rename scripts/{ => tests}/print_kafka_with_main_consumer.py (100%) rename scripts/{ => tests}/queue_test_001.py (100%) rename scripts/{ => tests}/queue_test_002.py (100%) rename scripts/{ => tests}/queue_test_002_ex.py (100%) rename scripts/{ => tests}/queue_test_003.py (100%) rename scripts/{ => tests}/queue_test_004.py (100%) diff --git a/scripts/kafka_consumer_publisher.py b/scripts/kafka_consumer_publisher.py new file mode 100644 index 0000000..9b58dd4 --- /dev/null +++ b/scripts/kafka_consumer_publisher.py @@ -0,0 +1,385 @@ +import os +import datetime +import pprint +import uuid +# from bluesky_kafka import RemoteDispatcher +from bluesky_kafka.consume import BasicConsumer +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from scipy import integrate +import time +import databroker +import json +import glob +from tqdm import tqdm + +import resource +resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) + +import _data_export as de +from _plot_helper import plot_uvvis +import _data_analysis as da +import _synthesis_queue as sq + +from bluesky_queueserver.manager.comms import zmq_single_request + +# db = databroker.Broker.named('xpd-ldrd20-31') +# catalog = databroker.catalog['xpd-ldrd20-31'] + +try: + from nslsii import _read_bluesky_kafka_config_file # nslsii <0.7.0 +except (ImportError, AttributeError): + from nslsii.kafka_utils import _read_bluesky_kafka_config_file # nslsii >=0.7.0 + +# these two lines allow a stale plot to remain interactive and prevent +# the current plot from stealing focus. thanks to Tom: +# https://nsls2.slack.com/archives/C02D9V72QH1/p1674589090772499 +plt.ion() +plt.rcParams["figure.raise_window"] = False + +## Input varaibales: read from inputs_qserver_kafka.xlsx +xlsx = '/home/xf28id2/Documents/ChengHung/inputs_kafka_single.xlsx' +input_dic = de._read_input_xlsx(xlsx) + +################################################################## +# Define namespace for tasks in Qserver and Kafa +dummy_kafka = bool(input_dic['dummy_test'][0]) +# dummy_qserver = bool(input_dic['dummy_test'][1]) +csv_path = input_dic['csv_path'][0] +key_height = input_dic['key_height'] +height = input_dic['height'] +distance = input_dic['distance'] +# pump_list = input_dic['pump_list'] +# precursor_list = input_dic['precursor_list'] +# syringe_mater_list = input_dic['syringe_mater_list'] +# syringe_list = input_dic['syringe_list'] +# target_vol_list = input_dic['target_vol_list'] +# set_target_list = input_dic['set_target_list'] +# infuse_rates = input_dic['infuse_rates'] +# sample = input_dic['sample'] +# mixer = input_dic['mixer'] +# wash_tube = input_dic['wash_tube'] +# resident_t_ratio = input_dic['resident_t_ratio'][0] +PLQY = input_dic['PLQY'] +# prefix = input_dic['prefix'] +# num_uvvis = input_dic['num_uvvis'] +################################################################### + +from blop import Agent, DOF, Objective +# agent_data_path = '/home/xf28id2/data_ZnCl2' +# agent_data_path = '/home/xf28id2/data_ZnI2_60mM' +# agent_data_path = '/home/xf28id2/data_halide' +agent_data_path = '/home/xf28id2/data_dilute_halide' + +wirte_agent_data = False + + +def print_kafka_messages(beamline_acronym, csv_path=csv_path, + key_height=key_height, height=height, distance=distance, + dummy_test=dummy_kafka, plqy=PLQY, + agent_data_path=agent_data_path,): + + print(f"Listening for Kafka messages for {beamline_acronym}") + print(f'Defaul parameters:\n' + f' csv path: {csv_path}\n' + f' key height: {key_height}\n' + f' height: {height}\n' + f' distance: {distance}\n') + + + global db, catalog + db = databroker.Broker.named(beamline_acronym) + catalog = databroker.catalog[f'{beamline_acronym}'] + + import palettable.colorbrewer.diverging as pld + palette = pld.RdYlGn_4_r + cmap = palette.mpl_colormap + # color_idx = np.linspace(0, 1, len(sample)) + + # plt.figure() + # def print_message(name, doc): + def print_message(consumer, doctype, doc, + bad_data = [], good_data = [], check_abs365 = False, finished = [], + agent_iteration = [], ): + # pump_list=pump_list, sample=sample, precursor_list=precursor_list, + # mixer=mixer, dummy_test=dummy_test): + name, message = doc + # print( + # f"{datetime.datetime.now().isoformat()} document: {name}\n" + # f"document keys: {list(message.keys())}\n" + # f"contents: {pprint.pformat(message)}\n" + # ) + if name == 'start': + print( + f"{datetime.datetime.now().isoformat()} documents {name}\n" + f"document keys: {list(message.keys())}") + + if 'uid' in message.keys(): + print(f"uid: {message['uid']}") + if 'plan_name' in message.keys(): + print(f"plan name: {message['plan_name']}") + if 'detectors' in message.keys(): + print(f"detectors: {message['detectors']}") + if 'pumps' in message.keys(): + print(f"pumps: {message['pumps']}") + # if 'detectors' in message.keys(): + # print(f"detectors: {message['detectors']}") + if 'uvvis' in message.keys() and message['plan_name']!='count': + print(f"uvvis mode:\n" + f" integration time: {message['uvvis'][0]} ms\n" + f" num spectra averaged: {message['uvvis'][1]}\n" + f" buffer capacity: {message['uvvis'][2]}" + ) + elif 'uvvis' in message.keys() and message['plan_name']=='count': + print(f"uvvis mode:\n" + f" spectrum type: {message['uvvis'][0]}\n" + f" integration time: {message['uvvis'][2]} ms\n" + f" num spectra averaged: {message['uvvis'][3]}\n" + f" buffer capacity: {message['uvvis'][4]}" + ) + if 'mixer' in message.keys(): + print(f"mixer: {message['mixer']}") + if 'sample_type' in message.keys(): + print(f"sample type: {message['sample_type']}") + + if name == 'stop': + # zmq_single_request(method='queue_stop') + print('\n*** qsever stop for data export, identification, and fitting ***\n') + + print(f"{datetime.datetime.now().isoformat()} documents {name}\n" + f"contents: {pprint.pformat(message)}" + ) + # num_events = len(message['num_events']) + + ## wait 2 seconds for databroker to save data + time.sleep(2) + uid = message['run_start'] + print(f'\n**** start to export uid: {uid} ****\n') + # print(list(message['num_events'].keys())[0]) + stream_list = list(message['num_events'].keys()) + + ## Set good/bad data condictions to the corresponding sample + try: + kh = key_height[len(finished)] + hei = height[len(finished)] + dis = distance[len(finished)] + except IndexError: + kh = key_height[0] + hei = height[0] + dis = distance[0] + + ## remove 'scattering' from stream_list to avoid redundant work in next for loop + if 'scattering' in stream_list: + stream_list.remove('scattering') + + ## Export, plotting, fitting, calculate # of good/bad data, add queue item + for stream_name in stream_list: + ## Read data from databroker and turn into dic + qepro_dic, metadata_dic = de.read_qepro_by_stream(uid, stream_name=stream_name, data_agent='tiled') + sample_type = metadata_dic['sample_type'] + ## Save data in dic into .csv file + de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name) + print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(csv_path)} **\n') + ## Plot data in dic + u = plot_uvvis(qepro_dic, metadata_dic) + if len(good_data)==0 and len(bad_data)==0: + clear_fig=True + else: + clear_fig=False + u.plot_data(clear_fig=clear_fig) + print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') + + ## Idenfify good/bad data if it is a fluorescence sacn in 'primary' + if qepro_dic['QEPro_spectrum_type'][0]==2 and stream_name=='primary': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + x0, y0, data_id, peak, prop = da._identify_one_in_kafka(qepro_dic, metadata_dic, key_height=kh, distance=dis, height=hei, dummy_test=dummy_test) + + + ## Apply an offset to zero baseline of absorption spectra + elif stream_name == 'absorbance': + print(f'\n*** start to check absorbance at 365b nm in stream: {stream_name} is positive or not***\n') + abs_array = qepro_dic['QEPro_output'][1:].mean(axis=0) + wavelength = qepro_dic['QEPro_x_axis'][0] + + popt_abs, _ = da.fit_line_2D(wavelength, abs_array, da.line_2D, x_range=[750, 950], plot=False) + abs_array_offset = abs_array - da.line_2D(wavelength, *popt_abs) + + print(f'\nFitting function for baseline offset: {da.line_2D}\n') + ff_abs={'fit_function': da.line_2D, 'curve_fit': popt_abs} + de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff_abs) + u.plot_offfset(wavelength, da.line_2D, popt_abs) + print(f'\n** export offset results of absorption spectra complete**\n') + + + ## Avergae scans in 'fluorescence' and idenfify good/bad + elif stream_name == 'fluorescence': + print(f'\n*** start to identify good/bad data in stream: {stream_name} ***\n') + x0, y0, data_id, peak, prop = da._identify_multi_in_kafka(qepro_dic, metadata_dic, key_height=kh, distance=dis, height=hei, dummy_test=dummy_test) + label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' + # u.plot_average_good(x0, y0, color=cmap(color_idx[sub_idx]), label=label_uid) + # sub_idx = sample.index(metadata_dic['sample_type']) + u.plot_average_good(x0, y0, label=label_uid, clf_limit=9) + + ## Skip peak fitting if qepro type is absorbance + if qepro_dic['QEPro_spectrum_type'][0] == 3: + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") + + else: + ## for a good data, type(peak) will be a np.array and type(prop) will be a dic + ## fit the good data, export/plotting fitting results + ## append data_id into good_data or bad_data for calculate numbers + if (type(peak) is np.ndarray) and (type(prop) is dict): + x, y, p, f_fit, popt = da._fitting_in_kafka(x0, y0, data_id, peak, prop, dummy_test=dummy_test) + + fitted_y = f_fit(x, *popt) + r2_idx1, _ = da.find_nearest(x, popt[1] - 3*popt[2]) + r2_idx2, _ = da.find_nearest(x, popt[1] + 3*popt[2]) + r_2 = da.r_square(x[r2_idx1:r2_idx2], y[r2_idx1:r2_idx2], fitted_y[r2_idx1:r2_idx2], y_low_limit=0) + + metadata_dic["r_2"] = r_2 + + if 'gauss' in f_fit.__name__: + constant = 2.355 + else: + constant = 1 + + intensity_list = [] + peak_list = [] + fwhm_list = [] + for i in range(int(len(popt)/3)): + intensity_list.append(popt[i*3+0]) + peak_list.append(popt[i*3+1]) + fwhm_list.append(popt[i*3+2]*constant) + + peak_emission_id = np.argmax(np.asarray(intensity_list)) + peak_emission = peak_list[peak_emission_id] + fwhm = fwhm_list[peak_emission_id] + + ## Calculate PLQY for fluorescence stream + if (stream_name == 'fluorescence') and (PLQY[0]==1): + PL_integral_s = integrate.simpson(y,x) + label_uid = f'{uid[0:8]}_{metadata_dic["sample_type"]}' + u.plot_CsPbX3(x, y, peak_emission, label=label_uid, clf_limit=9) + + ## Find absorbance at 365 nm from absorbance stream + # q_dic, m_dic = de.read_qepro_by_stream(uid, stream_name='absorbance', data_agent='tiled') + # abs_array = q_dic['QEPro_output'][1:].mean(axis=0) + # wavelength = q_dic['QEPro_x_axis'][0] + + idx1, _ = da.find_nearest(wavelength, PLQY[2]) + absorbance_s = abs_array_offset[idx1] + + if PLQY[1] == 'fluorescein': + plqy = da.plqy_fluorescein(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) + else: + plqy = da.plqy_quinine(absorbance_s, PL_integral_s, 1.506, *PLQY[3:]) + + + plqy_dic = {'PL_integral':PL_integral_s, 'Absorbance_365':absorbance_s, 'plqy': plqy} + + optical_property = {'Peak': peak_emission, 'FWHM':fwhm, 'PLQY':plqy} + + # Unify the unit of infuse rate as 'ul/min' + try: + ruc_0 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][0], r1 = 'ul/min') + ruc_1 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][1], r1 = 'ul/min') + ruc_2 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][2], r1 = 'ul/min') + ruc_3 = sq.rate_unit_converter(r0 = metadata_dic["infuse_rate_unit"][3], r1 = 'ul/min') + except (KeyError, IndexError): + pass + + + ## Check if pump is Infusing or Idle + is_infusing = [] + for pump_status in metadata_dic['pump_status']: + if pump_status == 'Infusing': + is_infusing.append(1) + elif pump_status == 'Idle': + is_infusing.append(0) + + if wirte_agent_data: + agent_data = {} + + agent_data.update(optical_property) + agent_data.update({k:v for k, v in metadata_dic.items() if len(np.atleast_1d(v)) == 1}) + # agent_data.update({k:v for k, v in metadata_dic.items()}) + + agent_data["infusion_rate_CsPb"] = metadata_dic["infuse_rate"][0]*ruc_0*is_infusing[0] + agent_data["infusion_rate_Br"] = metadata_dic["infuse_rate"][1]*ruc_1*is_infusing[1] + agent_data["infusion_rate_Cl"] = 0.0 + # agent_data["infusion_rate_I2"] = 0.0 + # agent_data["infusion_rate_Cl"] = metadata_dic["infuse_rate"][2]*ruc_2*is_infusing[2] + # agent_data["infusion_rate_I2"] = metadata_dic["infuse_rate"][3]*ruc_2*is_infusing[3] + agent_data["infusion_rate_I2"] = metadata_dic["infuse_rate"][2]*ruc_2*is_infusing[2] + agent_data["infusion_rate_Tol"] = metadata_dic["infuse_rate"][3]*ruc_2*is_infusing[3] + + + with open(f"{agent_data_path}/{data_id}.json", "w") as f: + json.dump(agent_data, f) + + print(f"\nwrote to {agent_data_path}") + + + ### Three parameters for ML: peak_emission, fwhm, plqy + # TODO: add ML agent code here + + data = {} + + + else: + plqy_dic = None + optical_property = None + + print(f'\nFitting function: {f_fit}\n') + ff={'fit_function': f_fit, 'curve_fit': popt} + de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff, plqy_dic=plqy_dic) + print(f'\n** export fitting results complete**\n') + + u.plot_peak_fit(x, y, f_fit, popt, peak=p, fill_between=True) + print(f'\n** plot fitting results complete**\n') + if stream_name == 'primary': + good_data.append(data_id) + + elif peak==[] and prop==[]: + bad_data.append(data_id) + print(f"\n*** No need to carry out fitting for {stream_name} in uid: {uid[:8]} ***\n") + print(f"\n*** since {stream_name} in uid: {uid[:8]} is a bad data.***\n") + + print('\n*** export, identify good/bad, fitting complete ***\n') + + try : + print(f"\n*** {sample_type} of uid: {uid[:8]} has: {optical_property}.***\n") + except (UnboundLocalError): + pass + + print(f'*** Accumulated num of good data: {len(good_data)} ***\n') + print(f'good_data = {good_data}\n') + print(f'*** Accumulated num of bad data: {len(bad_data)} ***\n') + print('########### Events printing division ############\n') + + + kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml") + + # this consumer should not be in a group with other consumers + # so generate a unique consumer group id for it + unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}" + + kafka_consumer = BasicConsumer( + topics=[f"{beamline_acronym}.bluesky.runengine.documents"], + bootstrap_servers=kafka_config["bootstrap_servers"], + group_id=unique_group_id, + consumer_config=kafka_config["runengine_producer_config"], + process_message = print_message, + ) + + try: + kafka_consumer.start_polling(work_during_wait=lambda : plt.pause(.1)) + except KeyboardInterrupt: + print('\nExiting Kafka consumer') + return() + + +if __name__ == "__main__": + import sys + print_kafka_messages(sys.argv[1]) diff --git a/scripts/DEBUSSY_plot_01.ipynb b/scripts/notes/DEBUSSY_plot_01.ipynb similarity index 100% rename from scripts/DEBUSSY_plot_01.ipynb rename to scripts/notes/DEBUSSY_plot_01.ipynb diff --git a/scripts/qserver_zmq_001.ipynb b/scripts/notes/qserver_zmq_001.ipynb similarity index 100% rename from scripts/qserver_zmq_001.ipynb rename to scripts/notes/qserver_zmq_001.ipynb diff --git a/scripts/read_db_catalog.ipynb b/scripts/notes/read_db_catalog.ipynb similarity index 100% rename from scripts/read_db_catalog.ipynb rename to scripts/notes/read_db_catalog.ipynb diff --git a/scripts/prepare_agent.py b/scripts/prepare_agent.py index aa42bb7..a88d7b0 100644 --- a/scripts/prepare_agent.py +++ b/scripts/prepare_agent.py @@ -30,7 +30,7 @@ def build_agen(peak_target=660, peak_tolerance=5): dofs = [ - DOF(description="CsPb(oleate)3", name="infusion_rate_CsPb", units="uL/min", search_bounds=(10, 110)), + DOF(description="CsPb(oleate)3", name="infusion_rate_CsPb", units="uL/min", search_bounds=(2, 110)), DOF(description="TOABr", name="infusion_rate_Br", units="uL/min", search_bounds=(50, 200)), DOF(description="ZnCl2", name="infusion_rate_Cl", units="uL/min", search_bounds=(0, Cl_up_limit)), DOF(description="ZnI2", name="infusion_rate_I2", units="uL/min", search_bounds=(0, I_up_limit)), diff --git a/scripts/kafka_consumer_single.py b/scripts/tests/kafka_consumer_single.py similarity index 100% rename from scripts/kafka_consumer_single.py rename to scripts/tests/kafka_consumer_single.py diff --git a/scripts/kafka_consumer_with_qserver.py b/scripts/tests/kafka_consumer_with_qserver.py similarity index 100% rename from scripts/kafka_consumer_with_qserver.py rename to scripts/tests/kafka_consumer_with_qserver.py diff --git a/scripts/kafka_consumer_with_qserver_dummy.py b/scripts/tests/kafka_consumer_with_qserver_dummy.py similarity index 100% rename from scripts/kafka_consumer_with_qserver_dummy.py rename to scripts/tests/kafka_consumer_with_qserver_dummy.py diff --git a/scripts/print_kafka_with_main.py b/scripts/tests/print_kafka_with_main.py similarity index 100% rename from scripts/print_kafka_with_main.py rename to scripts/tests/print_kafka_with_main.py diff --git a/scripts/print_kafka_with_main2.py b/scripts/tests/print_kafka_with_main2.py similarity index 100% rename from scripts/print_kafka_with_main2.py rename to scripts/tests/print_kafka_with_main2.py diff --git a/scripts/print_kafka_with_main3.py b/scripts/tests/print_kafka_with_main3.py similarity index 100% rename from scripts/print_kafka_with_main3.py rename to scripts/tests/print_kafka_with_main3.py diff --git a/scripts/print_kafka_with_main_consumer.py b/scripts/tests/print_kafka_with_main_consumer.py similarity index 100% rename from scripts/print_kafka_with_main_consumer.py rename to scripts/tests/print_kafka_with_main_consumer.py diff --git a/scripts/queue_test_001.py b/scripts/tests/queue_test_001.py similarity index 100% rename from scripts/queue_test_001.py rename to scripts/tests/queue_test_001.py diff --git a/scripts/queue_test_002.py b/scripts/tests/queue_test_002.py similarity index 100% rename from scripts/queue_test_002.py rename to scripts/tests/queue_test_002.py diff --git a/scripts/queue_test_002_ex.py b/scripts/tests/queue_test_002_ex.py similarity index 100% rename from scripts/queue_test_002_ex.py rename to scripts/tests/queue_test_002_ex.py diff --git a/scripts/queue_test_003.py b/scripts/tests/queue_test_003.py similarity index 100% rename from scripts/queue_test_003.py rename to scripts/tests/queue_test_003.py diff --git a/scripts/queue_test_004.py b/scripts/tests/queue_test_004.py similarity index 100% rename from scripts/queue_test_004.py rename to scripts/tests/queue_test_004.py