Skip to content

Commit

Permalink
test class xlsx_to_inputs()
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 1, 2024
1 parent 48b0e3a commit cbe1912
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 103 deletions.
2 changes: 2 additions & 0 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def _kafka_process():
'fitting_pdf', 'fitting_pdf_path', 'cif_fn', 'gr_fn',
'use_sandbox', 'write_to_sandbox', 'sandbox_tiled_client', 'tiled_client',
'fn_TBD',
'entry', 'iq_q', 'iq_I', 'stream_list', 'uid', 'uid_catalog', 'uid_pdfstream',
''
]

return kafka_list
Expand Down
87 changes: 53 additions & 34 deletions scripts/kafka_consumer_iterate_XPD_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ def print_kafka_messages(beamline_acronym_01, beamline_acronym_02, kin=kin, qin=
## Append raw data & analysis data tiled clients
kin.tiled_client.append(beamline_acronym_01)
kin.tiled_client.append(from_profile(beamline_acronym_01))
kin.tiled_client.append(beamline_acronym_02)
kin.tiled_client.append(from_profile(beamline_acronym_02))
## 'xpd-analysis' is not a catalog name so can't be accessed in databroker

kin.sandbox_tiled_client.append(from_uri(kin.sandbox_tiled_client[0]))

## Append good/bad data folder to csv_path
kin.csv_path.append(os.path.join(kin.csv_path[0], 'good_bad'))
Expand All @@ -126,7 +127,7 @@ def print_message(consumer, doctype, doc,

######### While document (name == 'start') and ('topic' in doc[1]) ##########
## ##
## Only print metadata when the docuemnt is a scan ##
## Only print metadata when the docuemnt is from pdfstream ##
## ##
#############################################################################
if (name == 'start') and ('topic' in doc[1]):
Expand Down Expand Up @@ -162,33 +163,36 @@ def print_message(consumer, doctype, doc,
if 'sample_type' in message.keys():
print(f"sample type: {message['sample_type']}")

## Reset kin.uid to as empty list
kin.uid = []


global uid
uid = []
######### While document (name == 'event') and ('topic' in doc[1]) ##########
## key 'topic' is added into the document of xpd-analysis ##
## key 'topic' is added into the doc of xpd-analysis in pdfstream ##
## Read uid of analysis data from doc[1]['data']['chi_I'] ##
## Get I(Q) data from the integral of 2D image by pdfstream ##
#############################################################################
if (name == 'event') and ('topic' in doc[1]):
# print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
# f"contents: {pprint.pformat(message)}"
# )
# time.sleep(1)
global entry, iq_Q, iq_I

# entry, iq_Q, iq_I = [], [], []
# f"contents: {pprint.pformat(message)}")

iq_I_uid = doc[1]['data']['chi_I']
entry = sandbox_tiled_client[iq_I_uid]
df = entry.read()
# print(f'{entry.metadata = }')
iq_Q = df['chi_Q'].to_numpy()
iq_I = df['chi_I'].to_numpy()
kin.uid_pdfstream.append(iq_I_uid)
kin.entry.append(sandbox_tiled_client[iq_I_uid])
df = kin.entry[-1].read()
kin.iq_Q.append(df['chi_Q'].to_numpy())
kin.iq_I.append(df['chi_I'].to_numpy())

## Reset kin.uid to as empty list
kin.uid = []



global stream_list
## Acquisition of xray_uvvis_plan finished but analysis of pdfstream not yet
## So just stop queue but not assign uid, stream_list
#### While document (name == 'stop') and ('scattering' in message['num_events']) ####
## Acquisition of xray_uvvis_plan finished but analysis of pdfstream not yet ##
## So just sleep 1 second but not assign uid, stream_list ##
## No need to stop queue since the net queue task is wahsing loop ##
#####################################################################################
if (name == 'stop') and ('scattering' in message['num_events']):
print('\n*** qsever stop for data export, identification, and fitting ***\n')
print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
Expand All @@ -197,39 +201,54 @@ def print_message(consumer, doctype, doc,
# inst1 = BInst("queue_stop")
# RM.item_add(inst1, pos='front')
## wait 1 second for databroker to save data
time.sleep(1)
time.sleep(1)
kin.uid = []


## With taking xray_uvvis_pla, analysis of pdfstream finished
## queue should stop before (when acquisition finished)
## Obtain raw data uid by reading metadata in sandbox Tiled

#### (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0) ####
## With taking xray_uvvis_plan and analysis of pdfstream finished ##
## Sleep 1 second and assign uid, stream_list from kin.entry[-1] ##
## No need to stop queue since the net queue task is wahsing loop ##
#####################################################################################
elif (name == 'stop') and ('topic' in doc[1]) and (len(message['num_events'])>0):
print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}"
)
## wait 1 second for databroker to save data
time.sleep(1)
uid = entry.metadata['run_start']
stream_list = tiled_client[uid].metadata['summary']['stream_names']
kin.uid = kin.entry[-1].metadata['run_start']
kin.uid_catalog.append(kin.uid)
stream_list = kin.tiled_client[-1][kin.uid].metadata['summary']['stream_names']
kin.stream_list = []
for stream_name in syringe_list:
kin.stream_list.append(stream_name)



## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream
## Stop queue first
## Obtain raw data uid in bluesky doc, message
######### (name == 'stop') and ('take_a_uvvis' in message['num_events']) ##########
## Only take a Uv-Vis, no X-ray data but still do analysis of pdfstream ##
## Stop queue first for identify good/bad data ##
## Obtain raw data uid in bluesky doc, message ##
#####################################################################################
elif (name == 'stop') and ('take_a_uvvis' in message['num_events']):
print('\n*** qsever stop for data export, identification, and fitting ***\n')

print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}"
)
f"contents: {pprint.pformat(message)}")

inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
## wait 1 second for databroker to save data
time.sleep(1)
uid = message['run_start']
kin.uid = message['run_start']
kin.uid_catalog.append(kin.uid)
stream_list = list(message['num_events'].keys())
kin.stream_list = []
for stream_name in syringe_list:
kin.stream_list.append(stream_name)


/////////////////////////////////////////////////////////////////////////////////////////////////

## When uid is assigned and type is a string, move to data fitting, calculation
if (name == 'stop') and (type(uid) is str):
print(f'\n**** start to export uid: {uid} ****\n')
Expand Down
181 changes: 181 additions & 0 deletions scripts/notes/Bluesky_publisher.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "266706b2",
"metadata": {},
"source": [
"## Bluesky Publisher\n",
"### used in bsui environment"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "64cd5061-4ffa-4a78-b9bf-f7bc33b7afd9",
"metadata": {},
"outputs": [],
"source": [
"from bluesky_kafka import Publisher"
]
},
{
"cell_type": "markdown",
"id": "5ce7cc25",
"metadata": {},
"source": [
"### res is defined in 00-startup.py"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19cff38b-5f91-444b-a97a-4c06556167f5",
"metadata": {},
"outputs": [],
"source": [
"p = Publisher(topic=res[1].beamline_topic, bootstrap_servers=res[1].bootstrap_servers, \n",
" key =\"test\", producer_config=res[1].producer_config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d071468d",
"metadata": {},
"outputs": [],
"source": [
"uids = ['699593ad-c210-4c53-8df5-9b08ccb8e025',\n",
" '74288b93-4e5c-40bb-a8cd-a9b25f68efbf',\n",
" '38729429-64b4-4f61-a07d-bcd7a70aa845',\n",
" 'f9258402-470a-42d8-8493-a357f96a0885']\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b2a9dce",
"metadata": {},
"outputs": [],
"source": [
"from databroker import Broker\n",
"db = Broker.named('xpd-ldrd20-31')\n",
"for uid in uids:\n",
" hdr = db[uid]\n",
" for name, doc in hdr.documents():\n",
" p(name, doc)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "12daf875",
"metadata": {},
"outputs": [],
"source": [
"import databroker\n",
"catalog = databroker.catalog['xpd-ldrd20-31']\n",
"for uid in uids:\n",
" run = catalog[uid]\n",
" for name, doc in run.documents():\n",
" p(name, doc)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5599fbc9",
"metadata": {},
"outputs": [],
"source": [
"from confluent_kafka import Producer, KafkaException\n",
"for name, doc in run.documents():\n",
" try:\n",
" p(name, doc)\n",
" except KafkaException:\n",
" pass"
]
},
{
"cell_type": "markdown",
"id": "470f165f",
"metadata": {},
"source": [
"## Notes for databroker and tiled"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fb2fda28",
"metadata": {},
"outputs": [],
"source": [
"uid_sandbox = '122fe6ee-0851-4526-8c20-6c0f654d74d2'\n",
"sandbox_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox\")\n",
"sandbox_tiled_client[uid_sandbox]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2c7ca85",
"metadata": {},
"outputs": [],
"source": [
"uid_analysis = '7dd39c4f-2c8d-4523-96e7-3f693595f776'\n",
"sandbox_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/sandbox\")\n",
"sandbox_tiled_client[uid_analysis]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c7279043",
"metadata": {},
"outputs": [],
"source": [
"\n",
"# uid_xpd = '590b010d-bd83-493b-90dd-bdb42a71fc61'\n",
"uid_xpd = '1ae37977-436c-42c0-a105-8a39c5aa8bfd'\n",
"xpd_tiled_client = from_uri(\"https://tiled.nsls2.bnl.gov/api/v1/metadata/xpd/raw\")\n",
"xpd_tiled_client[uid_xpd]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b54224a2",
"metadata": {},
"outputs": [],
"source": [
"'take_a_uvvis'\n",
"uid = 'b17bad22-290e-429d-96ed-0226a45a48ce'\n",
"\n",
"'xray_uvvis_plan'\n",
"uid = '53542574-6fee-4023-bef0-ad353accd01c'\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
3 changes: 2 additions & 1 deletion scripts/tests/print_kafka_with_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ def print_message(name, doc):
print(
f"{datetime.datetime.now().isoformat()} document: {name}\n"
# f"contents: {pprint.pformat(doc)}\n"
f"{doc.keys()}\n"
)
if name == 'stop':
time.sleep(2)
print(db[doc['run_start']].table())
# print(db[doc['run_start']].table())
# print(doc['run_start'])

# zmq_single_request(method='queue_item_add', params={'item':{"name":"insitu_test", "args": [1 ,1]
Expand Down
Loading

0 comments on commit cbe1912

Please sign in to comment.