From cbe19121a48d66e4549f8bf4af1f454aa3186a47 Mon Sep 17 00:00:00 2001 From: XPD Operator Date: Thu, 1 Aug 2024 19:51:08 -0400 Subject: [PATCH] test class xlsx_to_inputs() --- scripts/_LDRD_Kafka.py | 2 + scripts/kafka_consumer_iterate_XPD_v2.py | 87 +++++---- scripts/notes/Bluesky_publisher.ipynb | 181 ++++++++++++++++++ scripts/tests/print_kafka_with_main.py | 3 +- .../tests/print_kafka_with_main_consumer.py | 136 ++++++------- startup/00-startup.py | 2 + 6 files changed, 308 insertions(+), 103 deletions(-) create mode 100644 scripts/notes/Bluesky_publisher.ipynb diff --git a/scripts/_LDRD_Kafka.py b/scripts/_LDRD_Kafka.py index 39d0b80..94c2392 100644 --- a/scripts/_LDRD_Kafka.py +++ b/scripts/_LDRD_Kafka.py @@ -44,6 +44,8 @@ def _kafka_process(): 'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn', 'use_sandbox', 'write_to_sandbox', 'sandbox_tiled_client', 'tiled_client', 'fn_TBD', + 'entry', 'iq_q', 'iq_I', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream', + '' ] return kafka_list diff --git a/scripts/kafka_consumer_iterate_XPD_v2.py b/scripts/kafka_consumer_iterate_XPD_v2.py index 19c9258..f9ecf4e 100644 --- a/scripts/kafka_consumer_iterate_XPD_v2.py +++ b/scripts/kafka_consumer_iterate_XPD_v2.py @@ -105,8 +105,9 @@ def print_kafka_messages(beamline_acronym_01, beamline_acronym_02, kin=kin, qin= ## Append raw data & analysis data tiled clients kin.tiled_client.append(beamline_acronym_01) kin.tiled_client.append(from_profile(beamline_acronym_01)) - kin.tiled_client.append(beamline_acronym_02) - kin.tiled_client.append(from_profile(beamline_acronym_02)) + ## 'xpd-analysis' is not a catalog name so can't be accessed in databroker + + kin.sandbox_tiled_client.append(from_uri(kin.sandbox_tiled_client[0])) ## Append good/bad data folder to csv_path kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad')) @@ -126,7 +127,7 @@ def print_message(consumer, doctype, doc, ######### While document (name == 'start') and ('topic' in doc[1]) ########## ## ## - ## Only print metadata when the docuemnt is a scan ## + ## Only print metadata when the docuemnt is from pdfstream ## ## ## ############################################################################# if (name == 'start') and ('topic' in doc[1]): @@ -162,33 +163,36 @@ def print_message(consumer, doctype, doc, if 'sample_type' in message.keys(): print(f"sample type: {message['sample_type']}") + ## Reset kin.uid to as empty list + kin.uid = [] + - global uid - uid = [] ######### While document (name == 'event') and ('topic' in doc[1]) ########## - ## key 'topic' is added into the document of xpd-analysis ## + ## key 'topic' is added into the doc of xpd-analysis in pdfstream ## ## Read uid of analysis data from doc[1]['data']['chi_I'] ## ## Get I(Q) data from the integral of 2D image by pdfstream ## ############################################################################# if (name == 'event') and ('topic' in doc[1]): # print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" - # f"contents: {pprint.pformat(message)}" - # ) - # time.sleep(1) - global entry, iq_Q, iq_I - - # entry, iq_Q, iq_I = [], [], [] + # f"contents: {pprint.pformat(message)}") + iq_I_uid = doc[1]['data']['chi_I'] - entry = sandbox_tiled_client[iq_I_uid] - df = entry.read() - # print(f'{entry.metadata = }') - iq_Q = df['chi_Q'].to_numpy() - iq_I = df['chi_I'].to_numpy() + kin.uid_pdfstream.append(iq_I_uid) + kin.entry.append(sandbox_tiled_client[iq_I_uid]) + df = kin.entry[-1].read() + kin.iq_Q.append(df['chi_Q'].to_numpy()) + kin.iq_I.append(df['chi_I'].to_numpy()) + + ## Reset kin.uid to as empty list + kin.uid = [] + - global stream_list - ## Acquisition of xray_uvvis_plan finished but analysis of pdfstream not yet - ## So just stop queue but not assign uid, stream_list + #### While document (name == 'stop') and ('scattering' in message['num_events']) #### + ## Acquisition of xray_uvvis_plan finished but analysis of pdfstream not yet ## + ## So just sleep 1 second but not assign uid, stream_list ## + ## No need to stop queue since the net queue task is wahsing loop ## + ##################################################################################### if (name == 'stop') and ('scattering' in message['num_events']): print('\n*** qsever stop for data export, identification, and fitting ***\n') print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" @@ -197,39 +201,54 @@ def print_message(consumer, doctype, doc, # inst1 = BInst("queue_stop") # RM.item_add(inst1, pos='front') ## wait 1 second for databroker to save data - time.sleep(1) + time.sleep(1) + kin.uid = [] - - ## With taking xray_uvvis_pla, analysis of pdfstream finished - ## queue should stop before (when acquisition finished) - ## Obtain raw data uid by reading metadata in sandbox Tiled + + #### (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0) #### + ## With taking xray_uvvis_plan and analysis of pdfstream finished ## + ## Sleep 1 second and assign uid, stream_list from kin.entry[-1] ## + ## No need to stop queue since the net queue task is wahsing loop ## + ##################################################################################### elif (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0): print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" f"contents: {pprint.pformat(message)}" ) ## wait 1 second for databroker to save data time.sleep(1) - uid = entry.metadata['run_start'] - stream_list = tiled_client[uid].metadata['summary']['stream_names'] + kin.uid = kin.entry[-1].metadata['run_start'] + kin.uid_catalog.append(kin.uid) + stream_list = kin.tiled_client[-1][kin.uid].metadata['summary']['stream_names'] + kin.stream_list = [] + for stream_name in syringe_list: + kin.stream_list.append(stream_name) + + - ## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream - ## Stop queue first - ## Obtain raw data uid in bluesky doc, message + ######### (name == 'stop') and ('take_a_uvvis' in message['num_events']) ########## + ## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream ## + ## Stop queue first for identify good/bad data ## + ## Obtain raw data uid in bluesky doc, message ## + ##################################################################################### elif (name == 'stop') and ('take_a_uvvis' in message['num_events']): print('\n*** qsever stop for data export, identification, and fitting ***\n') print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n" - f"contents: {pprint.pformat(message)}" - ) + f"contents: {pprint.pformat(message)}") inst1 = BInst("queue_stop") RM.item_add(inst1, pos='front') ## wait 1 second for databroker to save data time.sleep(1) - uid = message['run_start'] + kin.uid = message['run_start'] + kin.uid_catalog.append(kin.uid) stream_list = list(message['num_events'].keys()) + kin.stream_list = [] + for stream_name in syringe_list: + kin.stream_list.append(stream_name) - + ///////////////////////////////////////////////////////////////////////////////////////////////// + ## When uid is assigned and type is a string, move to data fitting, calculation if (name == 'stop') and (type(uid) is str): print(f'\n**** start to export uid: {uid} ****\n') diff --git a/scripts/notes/Bluesky_publisher.ipynb b/scripts/notes/Bluesky_publisher.ipynb new file mode 100644 index 0000000..5ce16c7 --- /dev/null +++ b/scripts/notes/Bluesky_publisher.ipynb @@ -0,0 +1,181 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "266706b2", + "metadata": {}, + "source": [ + "## Bluesky Publisher\n", + "### used in bsui environment" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "64cd5061-4ffa-4a78-b9bf-f7bc33b7afd9", + "metadata": {}, + "outputs": [], + "source": [ + "from bluesky_kafka import Publisher" + ] + }, + { + "cell_type": "markdown", + "id": "5ce7cc25", + "metadata": {}, + "source": [ + "### res is defined in 00-startup.py" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19cff38b-5f91-444b-a97a-4c06556167f5", + "metadata": {}, + "outputs": [], + "source": [ + "p = Publisher(topic=res[1].beamline_topic, bootstrap_servers=res[1].bootstrap_servers, \n", + " key =\"test\", producer_config=res[1].producer_config)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d071468d", + "metadata": {}, + "outputs": [], + "source": [ + "uids = ['699593ad-c210-4c53-8df5-9b08ccb8e025',\n", + " '74288b93-4e5c-40bb-a8cd-a9b25f68efbf',\n", + " '38729429-64b4-4f61-a07d-bcd7a70aa845',\n", + " 'f9258402-470a-42d8-8493-a357f96a0885']\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4b2a9dce", + "metadata": {}, + "outputs": [], + "source": [ + "from databroker import Broker\n", + "db = Broker.named('xpd-ldrd20-31')\n", + "for uid in uids:\n", + " hdr = db[uid]\n", + " for name, doc in hdr.documents():\n", + " p(name, doc)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12daf875", + "metadata": {}, + "outputs": [], + "source": [ + "import databroker\n", + "catalog = databroker.catalog['xpd-ldrd20-31']\n", + "for uid in uids:\n", + " run = catalog[uid]\n", + " for name, doc in run.documents():\n", + " p(name, doc)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5599fbc9", + "metadata": {}, + "outputs": [], + "source": [ + "from confluent_kafka import Producer, KafkaException\n", + "for name, doc in run.documents():\n", + " try:\n", + " p(name, doc)\n", + " except KafkaException:\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "id": "470f165f", + "metadata": {}, + "source": [ + "## Notes for databroker and tiled" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb2fda28", + "metadata": {}, + "outputs": [], + "source": [ + "uid_sandbox = '122fe6ee-0851-4526-8c20-6c0f654d74d2'\n", + "sandbox_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox\")\n", + "sandbox_tiled_client[uid_sandbox]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a2c7ca85", + "metadata": {}, + "outputs": [], + "source": [ + "uid_analysis = '7dd39c4f-2c8d-4523-96e7-3f693595f776'\n", + "sandbox_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox\")\n", + "sandbox_tiled_client[uid_analysis]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c7279043", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# uid_xpd = '590b010d-bd83-493b-90dd-bdb42a71fc61'\n", + "uid_xpd = '1ae37977-436c-42c0-a105-8a39c5aa8bfd'\n", + "xpd_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/raw\")\n", + "xpd_tiled_client[uid_xpd]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b54224a2", + "metadata": {}, + "outputs": [], + "source": [ + "'take_a_uvvis'\n", + "uid = 'b17bad22-290e-429d-96ed-0226a45a48ce'\n", + "\n", + "'xray_uvvis_plan'\n", + "uid = '53542574-6fee-4023-bef0-ad353accd01c'\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/scripts/tests/print_kafka_with_main.py b/scripts/tests/print_kafka_with_main.py index cb4748a..5968285 100644 --- a/scripts/tests/print_kafka_with_main.py +++ b/scripts/tests/print_kafka_with_main.py @@ -23,10 +23,11 @@ def print_message(name, doc): print( f"{datetime.datetime.now().isoformat()} document: {name}\n" # f"contents: {pprint.pformat(doc)}\n" + f"{doc.keys()}\n" ) if name == 'stop': time.sleep(2) - print(db[doc['run_start']].table()) + # print(db[doc['run_start']].table()) # print(doc['run_start']) # zmq_single_request(method='queue_item_add', params={'item':{"name":"insitu_test", "args": [1 ,1] diff --git a/scripts/tests/print_kafka_with_main_consumer.py b/scripts/tests/print_kafka_with_main_consumer.py index 32e7184..3a23558 100644 --- a/scripts/tests/print_kafka_with_main_consumer.py +++ b/scripts/tests/print_kafka_with_main_consumer.py @@ -10,9 +10,9 @@ import time import databroker -import _data_export as de -from _plot_helper import plot_uvvis -import _data_analysis as da +# import _data_export as de +# from _plot_helper import plot_uvvis +# import _data_analysis as da # db = databroker.Broker.named('xpd-ldrd20-31') # catalog = databroker.catalog['xpd-ldrd20-31'] @@ -29,7 +29,7 @@ plt.rcParams["figure.raise_window"] = False -def print_kafka_messages(beamline_acronym, csv_path): +def print_kafka_messages(beamline_acronym): print(f"Listening for Kafka messages for {beamline_acronym}") global db, catalog @@ -40,76 +40,76 @@ def print_kafka_messages(beamline_acronym, csv_path): # def print_message(name, doc): def print_message(consumer, doctype, doc): name, message = doc - # print( - # f"{datetime.datetime.now().isoformat()} document: {name}\n" - # f"document keys: {list(message.keys())}\n" - # f"contents: {pprint.pformat(message)}\n" - # ) - if name == 'start': - print( - f"{datetime.datetime.now().isoformat()} documents {name}\n" - f"document keys: {list(message.keys())}") + print( + f"{datetime.datetime.now().isoformat()} document: {name}\n" + f"document keys: {list(message.keys())}\n" + # f"contents: {pprint.pformat(message)}\n" + ) + # if name == 'start': + # print( + # f"{datetime.datetime.now().isoformat()} documents {name}\n" + # f"document keys: {list(message.keys())}") - if 'uid' in message.keys(): - print(f"uid: {message['uid']}") - if 'plan_name' in message.keys(): - print(f"plan name: {message['plan_name']}") - if 'detectors' in message.keys(): - print(f"detectors: {message['detectors']}") - if 'pumps' in message.keys(): - print(f"pumps: {message['pumps']}") - # if 'detectors' in message.keys(): - # print(f"detectors: {message['detectors']}") - if 'uvvis' in message.keys() and message['plan_name']!='count': - print(f"uvvis mode:\n" - f" integration time: {message['uvvis'][0]} ms\n" - f" num spectra averaged: {message['uvvis'][1]}\n" - f" buffer capacity: {message['uvvis'][2]}" - ) - elif 'uvvis' in message.keys() and message['plan_name']=='count': - print(f"uvvis mode:\n" - f" spectrum type: {message['uvvis'][0]}\n" - f" integration time: {message['uvvis'][2]} ms\n" - f" num spectra averaged: {message['uvvis'][3]}\n" - f" buffer capacity: {message['uvvis'][4]}" - ) - if 'mixer' in message.keys(): - print(f"mixer: {message['mixer']}") - if 'sample_type' in message.keys(): - print(f"sample type: {message['sample_type']}") + # if 'uid' in message.keys(): + # print(f"uid: {message['uid']}") + # if 'plan_name' in message.keys(): + # print(f"plan name: {message['plan_name']}") + # if 'detectors' in message.keys(): + # print(f"detectors: {message['detectors']}") + # if 'pumps' in message.keys(): + # print(f"pumps: {message['pumps']}") + # # if 'detectors' in message.keys(): + # # print(f"detectors: {message['detectors']}") + # if 'uvvis' in message.keys() and message['plan_name']!='count': + # print(f"uvvis mode:\n" + # f" integration time: {message['uvvis'][0]} ms\n" + # f" num spectra averaged: {message['uvvis'][1]}\n" + # f" buffer capacity: {message['uvvis'][2]}" + # ) + # elif 'uvvis' in message.keys() and message['plan_name']=='count': + # print(f"uvvis mode:\n" + # f" spectrum type: {message['uvvis'][0]}\n" + # f" integration time: {message['uvvis'][2]} ms\n" + # f" num spectra averaged: {message['uvvis'][3]}\n" + # f" buffer capacity: {message['uvvis'][4]}" + # ) + # if 'mixer' in message.keys(): + # print(f"mixer: {message['mixer']}") + # if 'sample_type' in message.keys(): + # print(f"sample type: {message['sample_type']}") - if name == 'stop': - # print('Kafka test good!!') - print(f"{datetime.datetime.now().isoformat()} documents {name}\n" - f"contents: {pprint.pformat(message)}" - ) - # num_events = len(message['num_events']) + # if name == 'stop': + # # print('Kafka test good!!') + # print(f"{datetime.datetime.now().isoformat()} documents {name}\n" + # f"contents: {pprint.pformat(message)}" + # ) + # # num_events = len(message['num_events']) - time.sleep(2) - uid = message['run_start'] - print(f'\n**** start to export uid: {uid} ****\n') - for stream_name in ['primary', 'absorbance', 'fluorescence']: - if stream_name in message['num_events'].keys(): - qepro_dic, metadata_dic = de.read_qepro_by_stream(uid, stream_name=stream_name, data_agent='catalog') - de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name) - print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(csv_path)} **\n') - u = plot_uvvis(qepro_dic, metadata_dic) - u.plot_data() - print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') + # time.sleep(2) + # uid = message['run_start'] + # print(f'\n**** start to export uid: {uid} ****\n') + # # for stream_name in ['primary', 'absorbance', 'fluorescence']: + # # if stream_name in message['num_events'].keys(): + # # qepro_dic, metadata_dic = de.read_qepro_by_stream(uid, stream_name=stream_name, data_agent='catalog') + # # de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name) + # # print(f'\n** export {stream_name} in uid: {uid[0:8]} to ../{os.path.basename(csv_path)} **\n') + # # u = plot_uvvis(qepro_dic, metadata_dic) + # # u.plot_data() + # # print(f'\n** Plot {stream_name} in uid: {uid[0:8]} complete **\n') - if qepro_dic['QEPro_spectrum_type'][0] == 2: - print('\n*** start to identify good/bad data ***\n') - x, y, p, f, popt = da._fitting_in_kafka(qepro_dic, metadata_dic, key_height=200, distance=100, height=50) + # # if qepro_dic['QEPro_spectrum_type'][0] == 2: + # # print('\n*** start to identify good/bad data ***\n') + # # x, y, p, f, popt = da._fitting_in_kafka(qepro_dic, metadata_dic, key_height=200, distance=100, height=50) - ff={'fit_function': f, 'curve_fit': popt} - de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff) - print(f'\n** export fitting results complete**\n') + # # ff={'fit_function': f, 'curve_fit': popt} + # # de.dic_to_csv_for_stream(csv_path, qepro_dic, metadata_dic, stream_name=stream_name, fitting=ff) + # # print(f'\n** export fitting results complete**\n') - u.plot_peak_fit(x, y, p, f, popt, fill_between=True) - print(f'\n** plot fitting results complete**\n') + # # u.plot_peak_fit(x, y, p, f, popt, fill_between=True) + # # print(f'\n** plot fitting results complete**\n') - print('\n*** export, identify good/bad, fitting complete ***\n') - print('########### Events printing division ############\n') + # print('\n*** export, identify good/bad, fitting complete ***\n') + # print('########### Events printing division ############\n') kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml") @@ -134,4 +134,4 @@ def print_message(consumer, doctype, doc): if __name__ == "__main__": import sys - print_kafka_messages(sys.argv[1], sys.argv[2]) + print_kafka_messages(sys.argv[1]) diff --git a/startup/00-startup.py b/startup/00-startup.py index 2e316dd..ca838b2 100644 --- a/startup/00-startup.py +++ b/startup/00-startup.py @@ -38,12 +38,14 @@ db = Broker.named("xpd-ldrd20-31") # db = Broker.named("xpd") +# db = Broker.named("xpd-analysis") bec = BestEffortCallback() RE.subscribe(db.insert) RE.subscribe(bec) res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd-ldrd20-31") # res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd") +# res = nslsii.configure_kafka_publisher(RE, beamline_name="xpd-analysis")