Skip to content

Commit

Permalink
Update BS plan, kafka, qsever for ML
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Nov 1, 2023
1 parent 7ed2b87 commit 98e73bc
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 98 deletions.
101 changes: 96 additions & 5 deletions scripts/.ipynb_checkpoints/qserver_zmq_001-checkpoint.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
"## 1. Open a terminal for qserver manager\n",
"~$ conda activate 2023-1.3-py310-tiled\n",
"\n",
"~$ start-re-manager --startup-dir ~/.ipython/profile_collection/startup --keep-re"
"~$ start-re-manager --startup-dir ~/.ipython/profile_collection/startup --keep-re\n",
"\n",
"~$ start-re-manager --startup-dir ~/.ipython/profile_collection/startup --keep-re --redis-addr localhost:60737"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c1098d2d-02d9-41e0-8411-84ba2797fbe8",
"metadata": {},
"source": [
"## 2. Open another terminal for BSUI ipython\n",
"~$ conda activate 2023-1.3-py310-tiled\n",
"\n",
"~$ BS_ENV=2023-1.3-py310-tiled bsui"
]
Expand Down Expand Up @@ -297,7 +299,96 @@
" \"data_agent\":\"tiled\"}, \n",
" \"item_type\":\"plan\"}, \n",
" 'user_group':'primary', \n",
" 'user':'chlin'})\n"
" 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32f7b36f",
"metadata": {},
"outputs": [],
"source": [
"zmq_single_request(method='queue_item_add', params={'item':{\"name\":\"insitu_test\", \"args\": [1 ,1], \"kwargs\": {\"sample\": \"quinine_qserver\", \"csv_path\": \"/home/xf28id2/Documents/ChengHung/20230403_qserver_collection\", \"data_agent\":\"tiled\"}, \"item_type\":\"plan\"}, 'user_group':'primary', 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ea5f28a6",
"metadata": {},
"outputs": [],
"source": [
"zmq_single_request(method='queue_item_add', params={'item':{\"name\":\"take_a_uvvis_csv_q\", \n",
" \"kwargs\": {\"sample_type\": \"quinine_qserver\", \n",
" \"spectrum_type\": 'Corrected Sample', \n",
" \"correction_type\": 'Dark',\n",
" \"data_agent\":\"tiled\"}, \n",
" \"item_type\":\"plan\"}, \n",
" 'user_group':'primary', \n",
" 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2e50765-b098-4ef2-b38b-adb77d0b7d1c",
"metadata": {},
"outputs": [],
"source": [
"sl = 50 # syringe_list\n",
"pl = 'dds1_p1' # pump_list\n",
"ir = '99 ul/min' # infuse_rate\n",
"tvl = '30 ml' # target_vol_list\n",
"stl = False # set_target_list\n",
"sml = 'steel' # syringe_mater_list\n",
"zmq_single_request(method='queue_item_add', \n",
" params={\n",
" 'item':{\"name\":\"set_group_infuse2\", \n",
" \"args\": [[sl], [pl]], \n",
" \"kwargs\": {\"rate_list\":[ir], \"target_vol_list\":[tvl], \"set_target_list\":[stl], \"syringe_mater_list\":[sml]}, \n",
" \"item_type\":\"plan\"\n",
" }, 'user_group':'primary', 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "25660cbd-3f2b-4cb4-a129-eec1287798cc",
"metadata": {},
"outputs": [],
"source": [
"zmq_single_request(method='queue_item_add', \n",
" params={\n",
" 'item':{\"name\":\"start_group_infuse\", \n",
" \"args\": [[pl], [ir]], \n",
" \"item_type\":\"plan\"\n",
" }, 'user_group':'primary', 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9dd512c1-cc95-42e5-be7a-8c3493a30034",
"metadata": {},
"outputs": [],
"source": [
"zmq_single_request(method='queue_item_add', \n",
" params={\n",
" 'item':{\"name\":\"stop_group\", \n",
" \"args\": [pump_list], \n",
" \"item_type\":\"plan\"\n",
" }, 'user_group':'primary', 'user':'chlin'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d17b2b61",
"metadata": {},
"outputs": [],
"source": [
"zmq_single_request(method='queue_item_add', params={'item':{\"name\":\"take_a_uvvis_csv_q\",\"kwargs\":{\"sample_type\":\"quinine_qserver\",\"spectrum_type\":\"Corrected Sample\",\"correction_type\":\"Dark\",\"data_agent\":\"tiled\"},\"item_type\":\"plan\"},'user_group':'primary','user':'chlin'})"
]
},
{
Expand Down Expand Up @@ -815,7 +906,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "a855ce6b-f4d3-41d2-a606-c074abace2c8",
"metadata": {
"tags": []
Expand Down Expand Up @@ -864,7 +955,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.10.12"
}
},
"nbformat": 4,
Expand Down
60 changes: 35 additions & 25 deletions scripts/kafka_consumer_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@

##################################################################
# Define namespace for tasks in Qserver and Kafa
dummy_test = bool(input_dic['dummy_test'][0])
dummy_kafka = bool(input_dic['dummy_test'][0])
# dummy_qserver = bool(input_dic['dummy_test'][1])
csv_path = input_dic['csv_path'][0]
key_height = input_dic['key_height']
height = input_dic['height']
Expand All @@ -63,22 +64,30 @@
from bloptools.bayesian import Agent, DOF, Objective

dofs = [
#DOF(name="infusion_rate_1", limits=(1500, 2000)),
#DOF(name="infusion_rate_2", limits=(1500, 2000)),
DOF(name="infusion_rate_3", limits=(1500, 2000)),
DOF(name="infusion_rate_1", limits=(30, 150)),
DOF(name="infusion_rate_2", limits=(30, 150)),
# DOF(name="infusion_rate_3", limits=(1500, 2000)),
]

objectives = [
Objective(name="Peak emission", key="peak_emission", target=525, units="nm"),
Objective(name="Peak width", key="peak_fwhm", minimize=True, units="nm"),
Objective(name="Quantum yield", key="plqy"),
Objective(name="Peak emission", key="peak_emission", target=500),
Objective(name="Peak width", key="peak_fwhm", target="min"),
Objective(name="Quantum yield", key="plqy", target="max"),
]


# objectives = [
# Objective(name="Peak emission", key="peak_emission", target=525, units="nm"),
# Objective(name="Peak width", key="peak_fwhm", minimize=True, units="nm"),
# Objective(name="Quantum yield", key="plqy"),
# ]

USE_AGENT = False
agent = Agent(dofs=dofs, objectives=objectives, db=None, verbose=True)


def print_kafka_messages(beamline_acronym, csv_path=csv_path,
dummy_test=dummy_test, plqy=PLQY,
dummy_test=dummy_kafka, plqy=PLQY,
key_height=key_height, height=height, distance=distance,
):
print(f"Listening for Kafka messages for {beamline_acronym}")
Expand Down Expand Up @@ -256,30 +265,31 @@ def print_message(consumer, doctype, doc):
### Three parameters for ML: peak_emission, fwhm, plqy
# TODO: add ML agent code here

# predicttion = ML(peak_emission, fwhm, plqy)

table = pd.DataFrame(index=[0])
if USE_AGENT:

# DOFs
table.loc[0, "infusion_rate_1"] = metadata_dic["infuse_rate"][0]
table.loc[0, "infusion_rate_2"] = metadata_dic["infuse_rate"][1]
table.loc[0, "infusion_rate_3"] = metadata_dic["infuse_rate"][2]
table = pd.DataFrame(index=[0])

# DOFs
table.loc[0, "infusion_rate_1"] = metadata_dic["infuse_rate"][0]
table.loc[0, "infusion_rate_2"] = metadata_dic["infuse_rate"][1]
# table.loc[0, "infusion_rate_3"] = metadata_dic["infuse_rate"][2]

# Objectives
table.loc[0, "peak_emission"] = peak_emission
table.loc[0, "peak_fwhm"] = fwhm
table.loc[0, "plqy"] = plqy


agent.tell(table, append=True)
# Objectives
table.loc[0, "peak_emission"] = peak_emission
table.loc[0, "peak_fwhm"] = fwhm
table.loc[0, "plqy"] = plqy


if len(agent.table) < 4:
acq_func = "qr"
else:
acq_func = "qei"
agent.tell(table, append=True)

if len(agent.table) < 2:
acq_func = "qr"
else:
acq_func = "qei"

new_points, _ = agent.ask(acq_func, n=1)
new_points, _ = agent.ask(acq_func, n=1)



Expand Down
65 changes: 39 additions & 26 deletions scripts/kafka_consumer_with_qserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@
plt.rcParams["figure.raise_window"] = False

## Input varaibales: read from inputs_qserver_kafka.xlsx
xlsx = '/home/xf28id2/Documents/ChengHung/inputs_qserver_kafka_dilute.xlsx'
xlsx = '/home/xf28id2/Documents/ChengHung/inputs_qserver_kafka_ML.xlsx'
input_dic = de._read_input_xlsx(xlsx)

##################################################################
# Define namespace for tasks in Qserver and Kafa
dummy_test = bool(input_dic['dummy_test'][0])
dummy_kafka = bool(input_dic['dummy_test'][0])
dummy_qserver = bool(input_dic['dummy_test'][1])
csv_path = input_dic['csv_path'][0]
key_height = input_dic['key_height']
height = input_dic['height']
Expand All @@ -62,24 +63,33 @@
from bloptools.bayesian import Agent, DOF, Objective

dofs = [
DOF(name="infusion_rate_1", limits=(10, 100)),
DOF(name="infusion_rate_2", limits=(10, 100)),
DOF(name="infusion_rate_3", limits=(10, 100)),
DOF(name="infusion_rate_1", limits=(30, 150)),
DOF(name="infusion_rate_2", limits=(30, 150)),
# DOF(name="infusion_rate_3", limits=(1500, 2000)),
]

objectives = [
Objective(name="Peak emission", key="peak_emission"),
Objective(name="Peak width", key="peak_fwhm"),
Objective(name="Quantum yield", key="plqy"),
Objective(name="Peak emission", key="peak_emission", target=500),
Objective(name="Peak width", key="peak_fwhm", target="min"),
Objective(name="Quantum yield", key="plqy", target="max"),
]


# objectives = [
# Objective(name="Peak emission", key="peak_emission", target=525, units="nm"),
# Objective(name="Peak width", key="peak_fwhm", minimize=True, units="nm"),
# Objective(name="Quantum yield", key="plqy"),
# ]


USE_AGENT = False
agent = Agent(dofs=dofs, objectives=objectives, db=None, verbose=True)


def print_kafka_messages(beamline_acronym, csv_path=csv_path,
key_height=key_height, height=height, distance=distance,
pump_list=pump_list, sample=sample, precursor_list=precursor_list,
mixer=mixer, dummy_test=dummy_test, plqy=PLQY):
mixer=mixer, dummy_test=dummy_kafka, plqy=PLQY):

print(f"Listening for Kafka messages for {beamline_acronym}")
print(f'Defaul parameters:\n'
Expand Down Expand Up @@ -255,30 +265,33 @@ def print_message(consumer, doctype, doc,
### Three parameters for ML: peak_emission, fwhm, plqy
# TODO: add ML agent code here

# predicttion = ML(peak_emission, fwhm, plqy)
if USE_AGENT:

print(f'\n** Send peak_emission, FWHM, PLQY to ML agent**\n')
table = pd.DataFrame(index=[0])

table = pd.DataFrame(index=[0])
# DOFs
table.loc[0, "infusion_rate_1"] = metadata_dic["infuse_rate"][0]
table.loc[0, "infusion_rate_2"] = metadata_dic["infuse_rate"][1]
# table.loc[0, "infusion_rate_3"] = metadata_dic["infuse_rate"][2]

# DOFs
table.loc[0, "infusion_rate_1"] = metadata_dic["infuse_rate"][0]
table.loc[0, "infusion_rate_2"] = metadata_dic["infuse_rate"][1]
table.loc[0, "infusion_rate_3"] = metadata_dic["infuse_rate"][2]

# Objectives
table.loc[0, "peak_emission"] = peak_emission
table.loc[0, "peak_fwhm"] = fwhm
table.loc[0, "plqy"] = plqy


# Objectives
table.loc[0, "peak_emission"] = peak_emission
table.loc[0, "peak_fwhm"] = fwhm
table.loc[0, "plqy"] = plqy

agent.tell(table, append=True)

agent.tell(table, append=True)
if len(agent.table) < 2:
acq_func = "qr"
else:
acq_func = "qei"

if len(agent.table) < 4:
acq_func = "qr"
else:
acq_func = "qei"
new_points, _ = agent.ask(acq_func, n=1)

new_inputs = agent.ask(acq_func, n=1)
# print(f'\n** ML prediction: {new_points}**\n')


# ...
Expand Down
Loading

0 comments on commit 98e73bc

Please sign in to comment.