Skip to content

Commit

Permalink
Revise iterative Kafka for ML agent
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Dec 5, 2023
1 parent 7a8e2ed commit 0e545b2
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 332 deletions.
147 changes: 89 additions & 58 deletions scripts/_synthesis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,6 @@
from bluesky_queueserver.manager.comms import zmq_single_request
import _data_export as de

## Add ietration of ML agent into Qsever
def iterate_queue(
new_points,
syringe_list,
pump_list,
target_vol_list,
syringe_mater_list,
precursor_list,
mixer,
resident_t_ratio,
prefix,
wash_tube,
name_by_prefix=True,
num_abs=5,
num_flu=5,
new_points_unit='ul/min',
pos='back',
dummy_qserver=False,
):

# To do: unit conversion
# if rate_unit != 'ul/min':

# num_iteration = new_points[0].shape[0]
set_target_list = [0 for i in range(new_points[0].shape[1])]
sample = _auto_name_sample(new_points[0], prefix=prefix)

# infuse_rates_float = new_points[0]
# unit_array = np.full([infuse_rates_float.shape[0], infuse_rates_float.shape[1]],' ul/min', dtype='U7')
# infuse_rates_string = infuse_rates_float.astype('U25')
# infuse_rates = np.char.add(infuse_rates_string, unit_array)


return synthesis_queue(
syringe_list=syringe_list,
pump_list=pump_list,
set_target_list=set_target_list,
target_vol_list=target_vol_list,
rate_list=new_points[0],
syringe_mater_list=syringe_mater_list,
precursor_list=precursor_list,
mixer=mixer,
resident_t_ratio=resident_t_ratio,
prefix=prefix,
sample=sample,
wash_tube=wash_tube,
name_by_prefix=name_by_prefix,
num_abs=num_abs,
num_flu=num_flu,
rate_unit=new_points_unit,
pos='back',
dummy_qserver=False,
)


## Arrange tasks of for PQDs synthesis
def synthesis_queue(
syringe_list,
Expand All @@ -77,7 +22,8 @@ def synthesis_queue(
num_abs=5,
num_flu=5,
pos='back',
dummy_qserver=False,
dummy_qserver=False,
is_iteration=False,
):

if name_by_prefix:
Expand Down Expand Up @@ -158,21 +104,31 @@ def synthesis_queue(
'user':'chlin'})

else:
if is_iteration:
rest_time = resident_t_ratio[-1]

elif len(resident_t_ratio) == 1:
rest_time = resident_t_ratio[0]
elif len(resident_t_ratio) > 1 and i==0:
rest_time = resident_t_ratio[0]
elif len(resident_t_ratio) > 1 and i>0:
rest_time = resident_t_ratio[-1]

zmq_single_request(
method='queue_item_add',
params={
'item':{
"name":"wait_equilibrium",
"args": [pump_list, mixer],
"kwargs": {"ratio":resident_t_ratio},
"kwargs": {"ratio":rest_time},
"item_type":"plan"},
'pos': pos,
'user_group':'primary',
'user':'chlin'})



## 4. Take a fluorescence peak to check reaction
## 4-1. Take a fluorescence peak to check reaction
zmq_single_request(
method='queue_item_add',
params={
Expand All @@ -190,6 +146,24 @@ def synthesis_queue(
'user_group':'primary',
'user':'chlin'})

# ## 4-2. Take a Absorption spectra to check reaction
# zmq_single_request(
# method='queue_item_add',
# params={
# 'item':{
# "name":"take_a_uvvis_csv_q",
# "kwargs": {
# 'sample_type':sample[i],
# 'spectrum_type':'Absorbtion',
# 'correction_type':'Reference',
# 'pump_list':pump_list,
# 'precursor_list':precursor_list,
# 'mixer':mixer},
# "item_type":"plan"},
# 'pos': pos,
# 'user_group':'primary',
# 'user':'chlin'})

#### Kafka check data here.

## 5. Sleep for 5 seconds for Kafak to check good/bad data
Expand Down Expand Up @@ -403,4 +377,61 @@ def rate_unit_converter(r0 = 'ul/min', r1 = 'ul/min'):
'wait_equilibrium': '{...}',
'xray_uvvis_plan': '{...}'},
## Add ietration of ML agent into Qsever
def iterate_queue(
new_points,
syringe_list,
pump_list,
target_vol_list,
syringe_mater_list,
precursor_list,
mixer,
resident_t_ratio,
prefix,
wash_tube,
name_by_prefix=True,
num_abs=5,
num_flu=5,
new_points_unit='ul/min',
pos='back',
dummy_qserver=False,
):
# To do: unit conversion
# if rate_unit != 'ul/min':
# num_iteration = new_points[0].shape[0]
set_target_list = [0 for i in range(new_points[0].shape[1])]
sample = _auto_name_sample(new_points[0], prefix=prefix)
# infuse_rates_float = new_points[0]
# unit_array = np.full([infuse_rates_float.shape[0], infuse_rates_float.shape[1]],' ul/min', dtype='U7')
# infuse_rates_string = infuse_rates_float.astype('U25')
# infuse_rates = np.char.add(infuse_rates_string, unit_array)
return synthesis_queue(
syringe_list=syringe_list,
pump_list=pump_list,
set_target_list=set_target_list,
target_vol_list=target_vol_list,
rate_list=new_points[0],
syringe_mater_list=syringe_mater_list,
precursor_list=precursor_list,
mixer=mixer,
resident_t_ratio=resident_t_ratio,
prefix=prefix,
sample=sample,
wash_tube=wash_tube,
name_by_prefix=name_by_prefix,
num_abs=num_abs,
num_flu=num_flu,
rate_unit=new_points_unit,
pos='back',
dummy_qserver=False,
)
'''


Loading

0 comments on commit 0e545b2

Please sign in to comment.