diff --git a/abr-testing/abr_testing/protocols/active_protocols/11_Dynabeads_IP_Flex_96well_RIT.py b/abr-testing/abr_testing/protocols/active_protocols/11_Dynabeads_IP_Flex_96well_RIT.py index 44db654cc1f..ce389d811c1 100644 --- a/abr-testing/abr_testing/protocols/active_protocols/11_Dynabeads_IP_Flex_96well_RIT.py +++ b/abr-testing/abr_testing/protocols/active_protocols/11_Dynabeads_IP_Flex_96well_RIT.py @@ -57,7 +57,7 @@ def run(protocol: ProtocolContext) -> None: single_channel_mount = protocol.params.pipette_mount_1 # type: ignore[attr-defined] eight_channel_mount = protocol.params.pipette_mount_2 # type: ignore[attr-defined] deactivate_modules_bool = protocol.params.deactivate_modules # type: ignore[attr-defined] - helpers.comment_protocol_version(protocol, "01") + helpers.comment_protocol_version(protocol, "02") MIX_SPEED = heater_shaker_speed MIX_SEC = 10 @@ -65,35 +65,33 @@ def run(protocol: ProtocolContext) -> None: # if on deck: INCUBATION_SPEED = heater_shaker_speed * 0.5 INCUBATION_MIN = 60 - # load labware - - sample_plate_1 = protocol.load_labware( - "nest_96_wellplate_2ml_deep", "B2", "sample plate 1" + # load labware stacker + stacker_deep_wells = helpers.load_stacker_module( + protocol, "PS241204SZEVT22", "C4", "nest_96_wellplate_2ml_deep" ) - sample_plate_2 = protocol.load_labware( - "nest_96_wellplate_2ml_deep", "C4", "sample plate 2" - ) - + # load labware + sample_plate_1 = stacker_deep_wells.unload_and_move_labware("B2") wash_res = protocol.load_labware("nest_12_reservoir_15ml", "B1", "wash") reagent_res = protocol.load_labware( "opentrons_15_tuberack_nest_15ml_conical", "C3", "reagents" ) waste_res = protocol.load_labware("nest_1_reservoir_290ml", "D2", "Liquid Waste") - - tips = protocol.load_labware("opentrons_flex_96_tiprack_1000ul", "B3") - tips_sample = protocol.load_labware( - "opentrons_flex_96_tiprack_1000ul", "A2", "sample tips" + # stacker 1: 1000 ul tips + tip_stacker_1000 = helpers.load_stacker_module( + protocol, "PS241204SZEVT25", "B4", "opentrons_flex_96_tiprack_1000ul" ) + # Unload tip racks + tips = tip_stacker_1000.unload_and_move_labware("B3") + tips_sample = tip_stacker_1000.unload_and_move_labware("A2") tips_sample_loc = tips_sample.wells()[:95] + tips_reused = tip_stacker_1000.unload_and_move_labware("C2") + tips_reused_loc = tips_reused.wells()[:95] + tip_rack_list: List[Labware] = [tips, tips_sample, tips_reused] if READY_FOR_SDSPAGE == 0: - tips_elu = protocol.load_labware( - "opentrons_flex_96_tiprack_1000ul", "A1", "elution tips" - ) + tips_elu = tip_stacker_1000.unload_and_move_labware("A1") tips_elu_loc = tips_elu.wells()[:95] - tips_reused = protocol.load_labware( - "opentrons_flex_96_tiprack_1000ul", "C2", "reused tips" - ) - tips_reused_loc = tips_reused.wells()[:95] + tip_rack_list.append(tips_elu) + p1000 = protocol.load_instrument( "flex_8channel_1000", eight_channel_mount, tip_racks=[tips] ) @@ -103,24 +101,21 @@ def run(protocol: ProtocolContext) -> None: h_s: HeaterShakerContext = protocol.load_module( helpers.hs_str, "D1" ) # type: ignore[assignment] - working_plate, h_s_adapter = helpers.load_hs_adapter_and_labware( - "nest_96_wellplate_2ml_deep", h_s, "Working Plate" - ) + h_s_adapter = h_s.load_adapter("opentrons_96_deep_well_adapter") + h_s.open_labware_latch() + working_plate = stacker_deep_wells.unload_and_move_labware(h_s_adapter) if READY_FOR_SDSPAGE == 0: temp: TemperatureModuleContext = protocol.load_module( helpers.temp_str, "D3" ) # type: ignore[assignment] - final_plate, temp_adapter = helpers.load_temp_adapter_and_labware( - "nest_96_wellplate_2ml_deep", temp, "Final Plate" - ) + temp_adapter = temp.load_adapter("opentrons_96_deep_well_adapter") + final_plate = stacker_deep_wells.unload_and_move_labware(temp_adapter) mag: MagneticBlockContext = protocol.load_module( helpers.mag_str, "C1" ) # type: ignore[assignment] - # liquids samples1 = sample_plate_1.rows()[0][:NUM_COL] # 1 - samples2 = sample_plate_2.rows()[0][:NUM_COL] # 1 beads = reagent_res.wells()[0] # 2 ab = reagent_res.wells()[1] # 3 elu = reagent_res.wells()[2] # 4 @@ -139,7 +134,6 @@ def run(protocol: ProtocolContext) -> None: "Elution": [{"well": elu, "volume": 9800.0}], "Wash": [{"well": wash, "volume": 1500.0}], "Samples 1": [{"well": samples1, "volume": 250.0}], - "Samples 2": [{"well": samples2, "volume": 250.0}], } helpers.find_liquid_height_of_loaded_liquids( protocol, liquid_vols_and_wells, p1000_single @@ -289,11 +283,21 @@ def run(sample_plate: Labware) -> None: run(sample_plate_1) # swap plates - protocol.move_labware(sample_plate_1, "B4", True) - protocol.move_labware(sample_plate_2, "B2", True) + helpers.move_labware_to_destination( + protocol=protocol, + labware=sample_plate_1, + dest="B4", + use_gripper=True, + flex_stacker=True, + ) + sample_plate_2 = stacker_deep_wells.unload_and_move_labware("B2") + stacker_deep_wells.move_and_store_labware(sample_plate_1) + run(sample_plate_2) helpers.clean_up_plates(p1000_single, [wash_res], waste, 1000) helpers.find_liquid_height_of_all_wells(protocol, p1000_single, [waste_res["A1"]]) + # reload tip racks and labware + helpers.move_to_stacker_and_store(tip_stacker_1000, tip_rack_list) if deactivate_modules_bool: helpers.deactivate_modules(protocol) diff --git a/abr-testing/abr_testing/protocols/active_protocols/12_KAPA HyperPlus Library Prep.py b/abr-testing/abr_testing/protocols/active_protocols/12_KAPA HyperPlus Library Prep.py index ff38e8cf7c7..e2652b23a2b 100644 --- a/abr-testing/abr_testing/protocols/active_protocols/12_KAPA HyperPlus Library Prep.py +++ b/abr-testing/abr_testing/protocols/active_protocols/12_KAPA HyperPlus Library Prep.py @@ -14,7 +14,7 @@ MagneticBlockContext, ThermocyclerContext, ) -from typing import List, Tuple, Dict +from typing import List, Dict, Tuple metadata = { "protocolName": "KAPA HyperPlus Library Preparation", @@ -39,8 +39,8 @@ def add_parameters(parameters: ParameterContext) -> None: default=False, ) helpers.create_disposable_lid_parameter(parameters) - helpers.create_tc_lid_deck_riser_parameter(parameters) helpers.create_two_pipette_mount_parameters(parameters) + helpers.create_tc_lid_deck_riser_parameter(parameters) helpers.create_deactivate_modules_parameter(parameters) parameters.add_int( variable_name="num_samples", @@ -78,7 +78,7 @@ def run(protocol: ProtocolContext) -> None: pipette_1000_mount = protocol.params.pipette_mount_1 # type: ignore[attr-defined] pipette_50_mount = protocol.params.pipette_mount_2 # type: ignore[attr-defined] deck_riser = protocol.params.deck_riser # type: ignore[attr-defined] - helpers.comment_protocol_version(protocol, "01") + helpers.comment_protocol_version(protocol, "02") REUSE_ETOH_TIPS = True REUSE_RSB_TIPS = ( @@ -95,6 +95,10 @@ def run(protocol: ProtocolContext) -> None: trash_tips = False num_cols = math.ceil(num_samples / 8) + unused_lids: List[Labware] = [] + # Load TC Lids + if disposable_lid: + unused_lids = helpers.load_disposable_lids(protocol, 5, ["D3"], deck_riser) # Pre-set parameters # sample_vol = 35.0 @@ -121,10 +125,15 @@ def run(protocol: ProtocolContext) -> None: temp_mod: TemperatureModuleContext = protocol.load_module( helpers.temp_str, "B3" ) # type: ignore[assignment] - temp_plate, temp_adapter = helpers.load_temp_adapter_and_labware( - "armadillo_96_wellplate_200ul_pcr_full_skirt", - temp_mod, - "Temp Module Reservoir Plate", + temp_adapter = temp_mod.load_adapter("opentrons_96_well_aluminum_block") + tip_stacker_200 = helpers.load_stacker_module( + protocol, "PS241204SZEVT24", "D4", "opentrons_flex_96_tiprack_200ul" + ) + tip_stacker_50 = helpers.load_stacker_module( + protocol, "PS241204SZEVT26", "A4", "opentrons_flex_96_tiprack_50ul" + ) + tip_stacker_plates = helpers.load_stacker_module( + protocol, "PS241204SZEVT18", "C4", "armadillo_96_wellplate_200ul_pcr_full_skirt" ) if not dry_run: @@ -132,38 +141,36 @@ def run(protocol: ProtocolContext) -> None: tc_mod: ThermocyclerContext = protocol.load_module(helpers.tc_str) # type: ignore[assignment] # Just in case tc_mod.open_lid() + # Unload Armadillo plates + FLP_plate = tip_stacker_plates.unload_and_move_labware(magblock) - FLP_plate = magblock.load_labware( - "armadillo_96_wellplate_200ul_pcr_full_skirt", "FLP Plate" - ) samples_flp = FLP_plate.rows()[0][:num_cols] - - sample_plate = protocol.load_labware( - "armadillo_96_wellplate_200ul_pcr_full_skirt", "D1", "Sample Plate 1" - ) - - sample_plate_2 = protocol.load_labware( - "armadillo_96_wellplate_200ul_pcr_full_skirt", "B2", "Sample Plate 2" - ) + sample_plate = tip_stacker_plates.unload_and_move_labware("D1") + sample_plate_2 = tip_stacker_plates.unload_and_move_labware("B2") + temp_plate = tip_stacker_plates.unload_and_move_labware(temp_adapter) samples_2 = sample_plate_2.rows()[0][:num_cols] samples = sample_plate.rows()[0][:num_cols] reservoir = protocol.load_labware( "nest_96_wellplate_2ml_deep", "C2", "Beads + Buffer + Ethanol" ) - # Load tipracks - tiprack_50_1 = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "A3") - tiprack_50_2 = protocol.load_labware("opentrons_flex_96_tiprack_50ul", "A2") - - tiprack_200_1 = protocol.load_labware("opentrons_flex_96_tiprack_200ul", "C1") - tiprack_200_2 = protocol.load_labware("opentrons_flex_96_tiprack_200ul", "C3") + # Unload tipracks + tiprack_50_locations = ["A2", "A3"] + tiprack_200_locations = ["C1", "C3"] + tipracks_50 = [] + tipracks_200 = [] + + # Unload 50 ul tips + for tip_rack_destination_50 in tiprack_50_locations: + tip_rack_50 = tip_stacker_50.unload_and_move_labware(tip_rack_destination_50) + tipracks_50.append(tip_rack_50) + # Unload 200 ul tips + for tip_rack_destination_200 in tiprack_200_locations: + tip_rack_200 = tip_stacker_200.unload_and_move_labware(tip_rack_destination_200) + tipracks_200.append(tip_rack_200) if trash_tips: protocol.load_waste_chute() - unused_lids: List[Labware] = [] - # Load TC Lids - if disposable_lid: - unused_lids = helpers.load_disposable_lids(protocol, 5, ["C4"], deck_riser) # Import Global Variables global tip50 @@ -175,10 +182,10 @@ def run(protocol: ProtocolContext) -> None: p200 = protocol.load_instrument( "flex_8channel_1000", pipette_1000_mount, - tip_racks=[tiprack_200_1, tiprack_200_2], + tip_racks=tipracks_200, ) p50 = protocol.load_instrument( - "flex_8channel_50", pipette_50_mount, tip_racks=[tiprack_50_1, tiprack_50_2] + "flex_8channel_50", pipette_50_mount, tip_racks=tipracks_50 ) # Load Reagent Locations in Reservoirs diff --git a/abr-testing/abr_testing/protocols/active_protocols/4_Illumina DNA Enrichment.py b/abr-testing/abr_testing/protocols/active_protocols/4_Illumina DNA Enrichment.py index ff9a9807c92..df5b7316a9a 100644 --- a/abr-testing/abr_testing/protocols/active_protocols/4_Illumina DNA Enrichment.py +++ b/abr-testing/abr_testing/protocols/active_protocols/4_Illumina DNA Enrichment.py @@ -75,7 +75,7 @@ def run(protocol: ProtocolContext) -> None: deck_riser = protocol.params.deck_riser # type: ignore[attr-defined] trash_lid = protocol.params.trash_lid # type: ignore[attr-defined] deactivate_modules_bool = protocol.params.deactivate_modules # type: ignore[attr-defined] - helpers.comment_protocol_version(protocol, "01") + helpers.comment_protocol_version(protocol, "02") unused_lids: List[Labware] = [] used_lids: List[Labware] = [] @@ -333,7 +333,12 @@ def tipcheck() -> None: unused_lids, used_lids, ) = helpers.use_disposable_lid_with_tc( - protocol, unused_lids, used_lids, sample_plate_1, thermocycler + protocol, + unused_lids, + used_lids, + sample_plate_1, + thermocycler, + flex_stacker=True, ) else: thermocycler.close_lid() @@ -417,6 +422,7 @@ def tipcheck() -> None: used_lids, sample_plate_1, thermocycler, + flex_stacker=True, ) else: thermocycler.close_lid() @@ -736,6 +742,7 @@ def tipcheck() -> None: used_lids, sample_plate_1, thermocycler, + flex_stacker=True, ) else: thermocycler.close_lid() diff --git a/abr-testing/abr_testing/protocols/active_protocols/5_96ch complex protocol with single tip Pick Up.py b/abr-testing/abr_testing/protocols/active_protocols/5_96ch complex protocol with single tip Pick Up.py index ca7506cf6f0..9e9648949e6 100644 --- a/abr-testing/abr_testing/protocols/active_protocols/5_96ch complex protocol with single tip Pick Up.py +++ b/abr-testing/abr_testing/protocols/active_protocols/5_96ch complex protocol with single tip Pick Up.py @@ -130,21 +130,42 @@ def move_to_locations( def reset_labware() -> None: """Reset the labware to the reset location.""" - protocol.move_labware( - labware_to_move, reset_location, use_gripper=use_gripper - ) + if reset_location in ["C4"] or helpers.get_parent(labware_to_move) in [ + "C4" + ]: + helpers.move_labware_to_destination( + protocol=protocol, + labware=labware_to_move, + dest=reset_location, + use_gripper=True, + flex_stacker=True, + ) + else: + protocol.move_labware( + labware_to_move, reset_location, use_gripper=use_gripper + ) if len(move_locations) == 0: return for location in move_locations: - protocol.move_labware( - labware_to_move, location, use_gripper=use_gripper - ) + if location in ["C4"] or helpers.get_parent(labware_to_move) in [ + "C4", + ]: + helpers.move_labware_to_destination( + protocol=protocol, + labware=labware_to_move, + dest=location, + use_gripper=True, + flex_stacker=True, + ) + else: + protocol.move_labware( + labware_to_move, location, use_gripper=use_gripper + ) if reset_after_each_move: reset_labware() - if not reset_after_each_move: reset_labware() @@ -214,7 +235,6 @@ def staging_area_slot_4_moves(labware: Labware, reset_location: str) -> None: h_s_adapter, ], # Module Moves ] - run_moves( labware, staging_area_slot_4_move_sequence, @@ -255,20 +275,28 @@ def module_moves(labware: Labware, module_locations: List) -> None: ) staging_area_slot_3_moves(dest_pcr_plate, STAGING_AREA_SLOT_3_RESET_LOCATION) - protocol.move_labware( - dest_pcr_plate, - STAGING_AREA_SLOT_4_RESET_LOCATION, + helpers.move_labware_to_destination( + protocol=protocol, + labware=dest_pcr_plate, + dest=STAGING_AREA_SLOT_4_RESET_LOCATION, use_gripper=USING_GRIPPER, + flex_stacker=True, ) staging_area_slot_4_moves(dest_pcr_plate, STAGING_AREA_SLOT_4_RESET_LOCATION) - module_locations = [thermocycler] + adapters module_moves(dest_pcr_plate, module_locations) + protocol.move_labware(dest_pcr_plate, thermocycler, use_gripper=USING_GRIPPER) def test_manual_moves() -> None: """Test manual moves.""" - protocol.move_labware(source_reservoir, "D4", use_gripper=USING_GRIPPER) + helpers.move_labware_to_destination( + protocol=protocol, + labware=source_reservoir, + dest="D4", + use_gripper=USING_GRIPPER, + flex_stacker=True, + ) def test_pipetting() -> None: """Test pipetting.""" @@ -306,7 +334,13 @@ def test_column_tip_rack_usage() -> None: ) protocol.comment("------------------------------") protocol.comment(f"channels {pipette_96_channel.active_channels}") - protocol.move_labware(tip_rack_3, "C3", use_gripper=USING_GRIPPER) + helpers.move_labware_to_destination( + protocol=protocol, + labware=tip_rack_3, + dest="C3", + use_gripper=USING_GRIPPER, + flex_stacker=True, + ) for well in list_of_columns: tiprack_well = "A" + str(well) well_name = "A" + str(well) diff --git a/abr-testing/abr_testing/protocols/helpers.py b/abr-testing/abr_testing/protocols/helpers.py index 31a1d1a9244..9ab81bc23e1 100644 --- a/abr-testing/abr_testing/protocols/helpers.py +++ b/abr-testing/abr_testing/protocols/helpers.py @@ -15,9 +15,10 @@ MagneticModuleContext, AbsorbanceReaderContext, ) -from typing import List, Union, Dict, Tuple +from typing import List, Union, Dict, Tuple, Any from opentrons.hardware_control.modules.types import ThermocyclerStep from opentrons_shared_data.errors.exceptions import PipetteLiquidNotFoundError +from opentrons.drivers.stacker.slas_demo import StackerModule # FUNCTIONS FOR LOADING COMMON CONFIGURATIONS @@ -411,10 +412,18 @@ def use_disposable_lid_with_tc( used_lids: List[Labware], plate_in_thermocycler: Labware, thermocycler: ThermocyclerContext, + flex_stacker: bool = False, ) -> Tuple[Labware, List[Labware], List[Labware]]: """Use disposable lid with thermocycler.""" + x = y = z = 0.0 lid_on_plate = unused_lids[0] - protocol.move_labware(lid_on_plate, plate_in_thermocycler, use_gripper=True) + if flex_stacker: + z = 12 + x = -3 + offsets = {"x": x, "y": y, "z": z} + protocol.move_labware( + lid_on_plate, plate_in_thermocycler, use_gripper=True, pick_up_offset=offsets + ) # Remove lid from the list unused_lids.pop(0) used_lids.append(lid_on_plate) @@ -597,6 +606,108 @@ def load_wells_with_water( well.load_liquid(water, volume) +# Functions for gripper movements + + +def move_labware_to_destination( + protocol: ProtocolContext, + labware: Labware, + dest: Any, + use_gripper: bool = True, + flex_stacker: bool = False, +) -> None: + """Move labware from one location another.""" + x = y = z = 0.0 + + if flex_stacker: + z = 12 + x = -3 + offsets = {"x": x, "y": y, "z": z} + if isinstance(dest, str): + try: + if dest[1] == "4": + protocol.move_labware( + labware=labware, + new_location=dest, + use_gripper=use_gripper, + drop_offset=offsets, + ) + return + except IndexError: + print("OT-2") + return + elif isinstance(dest, Labware): + labware_location = get_parent(dest) # type: ignore + if str(labware_location)[1] == "4": + protocol.move_labware( + labware=labware, + new_location=dest, + use_gripper=use_gripper, + drop_offset=offsets, + ) + else: + protocol.move_labware( + labware=labware, + new_location=dest, + use_gripper=use_gripper, + pick_up_offset=offsets, + ) + print(f"{labware.load_name} was moved to {labware.parent}") + + +def get_parent(labware: Labware) -> Any: + """Helper to get labware location.""" + labware_location = labware.parent # type: ignore + if isinstance(labware_location, Labware): + return get_parent(labware_location) + else: + return labware_location + + +# Stacker Functions +# TODO add more stacker functions and incorporate them into protocols +def load_stacker_module( + protocol: ProtocolContext, serial_number: str, slot: str, labware_name: str +) -> StackerModule: + """Load stacker module.""" + hardware = protocol._hw_manager.hardware + hardware.cache_instruments() + return StackerModule( + serial_number=serial_number, + labware_name=labware_name, + slot=slot, + protocol=protocol, + ) + + +def unload_and_move( + stacker: StackerModule, + new_location: Labware | str, +) -> None: + """Move labware off stacker into deck.""" + stacker.unload_and_move_labware(new_location=new_location) + + +def move_to_stacker_and_store( + stacker: StackerModule, list_of_labware: List[Labware] +) -> None: + """Move labware from deck into stacker.""" + for labware in list_of_labware: + stacker.move_and_store_labware(lw=labware) + + +def move_lid_from_stacker_to_tc_plate( + stacker: StackerModule, + plate_on_thermocycler: Labware, + thermocycler: ThermocyclerContext, +) -> Labware: + """Move lid from stacker to thermocycler plate.""" + thermocycler.open_lid() + lid_on_plate = stacker.unload_and_move_labware(plate_on_thermocycler) + thermocycler.close_lid() + return lid_on_plate + + # CONSTANTS hs_str = "heaterShakerModuleV1" diff --git a/abr-testing/test_debug b/abr-testing/test_debug index e69de29bb2d..49cf77c4fc8 100644 --- a/abr-testing/test_debug +++ b/abr-testing/test_debug @@ -0,0 +1,42 @@ +Exception raised by protocol +Traceback (most recent call last): + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocols\execution\execute_python.py", line 160, in exec_run + exec("run(__context)", new_globs) + File "", line 1, in + File "tc_lid_x_offset_test.py", line 99, in run + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocols\api_support\util.py", line 389, in _check_version_wrapper + return decorated_obj(*args, **kwargs) + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_api\labware.py", line 560, in load_labware + labware_core = self._protocol_core.load_labware( + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_api\core\engine\protocol.py", line 225, in load_labware + load_result = self._engine_client.execute_command_without_recovery( + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_engine\clients\sync_client.py", line 120, in execute_command_without_recovery + return self._transport.execute_command(create_request) + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_engine\clients\transports.py", line 84, in execute_command + raise ProtocolCommandFailedError( +opentrons.protocol_engine.errors.error_occurrence.ProtocolCommandFailedError: Error 4000 GENERAL_ERROR (ProtocolCommandFailedError): PythonException: ValueError: Labware Lid opentrons_tough_pcr_auto_sealing_lid may not be loaded on parent labware Opentrons Flex Deck Riser. + +The above exception was the direct cause of the following exception: + +Traceback (most recent call last): + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_runner\task_queue.py", line 84, in _run + await self._run_func() + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_runner\task_queue.py", line 61, in _do_run + await func(*args, **kwargs) + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_runner\protocol_runner.py", line 254, in run_func + await self._protocol_executor.execute( + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocol_runner\python_protocol_wrappers.py", line 159, in execute + await to_thread.run_sync( + File "C:\Users\Rhyann Clarke\.virtualenvs\abr-testing-6acHInfl\lib\site-packages\anyio\to_thread.py", line 33, in run_sync + return await get_asynclib().run_sync_in_worker_thread( + File "C:\Users\Rhyann Clarke\.virtualenvs\abr-testing-6acHInfl\lib\site-packages\anyio\_backends\_asyncio.py", line 877, in run_sync_in_worker_thread + return await future + File "C:\Users\Rhyann Clarke\.virtualenvs\abr-testing-6acHInfl\lib\site-packages\anyio\_backends\_asyncio.py", line 807, in run + result = context.run(func, *args) + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocols\execution\execute.py", line 39, in run_protocol + exec_run( + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocols\execution\execute_python.py", line 169, in exec_run + _raise_pretty_protocol_error(exception=e, filename=filename) + File "c:\users\rhyann clarke\opentrons\api\src\opentrons\protocols\execution\execute_python.py", line 71, in _raise_pretty_protocol_error + raise ExceptionInProtocolError( +opentrons.protocols.execution.errors.ExceptionInProtocolError: ProtocolCommandFailedError [line 99]: Error 4000 GENERAL_ERROR (ProtocolCommandFailedError): PythonException: ValueError: Labware Lid opentrons_tough_pcr_auto_sealing_lid may not be loaded on parent labware Opentrons Flex Deck Riser. diff --git a/api/src/opentrons/drivers/stacker/__init__.py b/api/src/opentrons/drivers/stacker/__init__.py new file mode 100644 index 00000000000..4f3d0dd8e8e --- /dev/null +++ b/api/src/opentrons/drivers/stacker/__init__.py @@ -0,0 +1,4 @@ +"""Flex Stacker drivers.""" +from .flex_stacker_driver import FlexStacker, LABWARE_Z_HEIGHT, AXIS, GCODE, DIR + +__all__ = ["FlexStacker", "LABWARE_Z_HEIGHT", "AXIS", "GCODE", "DIR"] diff --git a/api/src/opentrons/drivers/stacker/flex_stacker_driver.py b/api/src/opentrons/drivers/stacker/flex_stacker_driver.py new file mode 100644 index 00000000000..437199c34fc --- /dev/null +++ b/api/src/opentrons/drivers/stacker/flex_stacker_driver.py @@ -0,0 +1,672 @@ +import serial +from serial import Serial # type: ignore[import] +from abc import ABC, abstractmethod +import time +from typing import Tuple +import re +from datetime import datetime +from enum import Enum +from opentrons.drivers.command_builder import CommandBuilder +import math +from typing import List, Optional, Iterator + +import serial.tools +import serial.tools.list_ports + +class GCODE(str, Enum): + CR = '\r\n', + MOVE_DIST = 'G0', + MOVE_MIRCROSTEP = 'G0.S', + MOVE_LS = 'G5', + LIMITSWITCH_STATUS = 'M119' + CURRENT_MOTION_PARMS = 'M120' + PLATFORM_STATUS = 'M121' + ENABLE_MOTOR = 'M17' + DISABLE_MOTOR = 'M18' + WRITE_TO_REGISTER = 'M921' + READ_FROM_REGISTER = 'M920' + SET_PEAK_CURRENT = 'M906' + SET_IHOLD_CURRENT = 'M907' + SET_MICROSTEPPING = 'M909' + STALLGUARD = 'M910' + GET_STALLGUARD_VAL = 'M911' + SET_SERIAL_NUM = 'M996' + DEVICE_INFO = 'M115' + + +class DIR(str, Enum): + POSITIVE = '', + NEGATIVE = '-', + NEGATIVE_HOME = '0' + POSITIVE_HOME = '1' + +class AXIS(str, Enum): + X = 'X', + Z = 'Z', + L = 'L', + +class LABWARE_Z_HEIGHT(float, Enum): + BIORAD_HARDSHELL_PCR = 24.0, + OPENTRONS_TIPRACKS = 122, + DEEPWELL_96 = 40.5, + FLEX_STACKER_PLATFORM = 8.4, + NEST_200_ul_PCR_PLATE = 15.5, + NEST_96_WELL_PLATE_FLATBOTTOM = 18, + NEST_96_WELL_PLATE_FLATBOTTOM_WITH_LID = 16, + NEST_96_DEEP_WELL_PLATE_VBOTTOM = 39.1, + NEST_12_DEEP_WEEL_PLATE_VBOTTOM = 29.75, + CORSTAR_24_WELL_WITH_LID = 16*2, + CORSTAR_24_WELL_WITHOUT_LID = 16*2, + SARSTEDT_PCR_PLATE_FULLSKIRT = 16, + ARMADILLO_384_PLATE = 15.5 + THERMOCYLER_LID_WITH_ADAPTER = 40 + +FS_BAUDRATE = 115200 +DEFAULT_FS_TIMEOUT = 0.1 +FS_COMMAND_TERMINATOR = "\r\n" +FS_ACK = "OK"+ FS_COMMAND_TERMINATOR.strip("\r") +FS_STALL = "async ERR403:motor stall error" + FS_COMMAND_TERMINATOR.strip("\r") +DEFAULT_COMMAND_RETRIES = 1 +TOTAL_TRAVEL_X = 192.5 +TOTAL_TRAVEL_Z = 136 +TOTAL_TRAVEL_L = 22 +RETRACT_DIST_X = 1 +RETRACT_DIST_Z = 1 +HOME_SPEED = 10 +HOME_SPEED_L = 100 +HOME_ACCELERATION = 100 +HOME_ACCELERATION_L = 800 +MOVE_ACCELERATION_X = 1500 +MOVE_ACCELERATION_Z = 500 +MOVE_ACCELERATION_L = 800 +MAX_SPEED_DISCONTINUITY_X = 40 +MAX_SPEED_DISCONTINUITY_Z = 40 +MAX_SPEED_DISCONTINUITY_L = 40 +HOME_CURRENT_X = 1.5 +HOME_CURRENT_Z = 1.5 +HOME_CURRENT_L = 0.8 +MOVE_CURRENT_X = 1.0 +MOVE_CURRENT_Z = 1.5 +MOVE_CURRENT_L = 0.6 +MOVE_SPEED_X = 200 +MOVE_SPEED_UPZ = 200 +MOVE_SPEED_L = 100 +MOVE_SPEED_DOWNZ = 200 +x_sg_value = 8 +z_sg_value = 16 +l_sg_value = 8 + +class FlexStacker(): + """Flex Stacker Driver.""" + + def __init__(self, connection: Serial) -> None: + """ + Constructor + + Args: + connection: SerialConnection to the plate stacker + """ + self._stacker_connection = connection + self._ack = FS_ACK.encode() + self._stall = FS_STALL.encode() + self.move_speed_x = MOVE_SPEED_X + self.move_speed_up_z = MOVE_SPEED_UPZ + self.move_speed_down_z = MOVE_SPEED_DOWNZ + self.home_acceleration = HOME_ACCELERATION + self.home_acceleration_l = HOME_ACCELERATION_L + self.home_speed = HOME_SPEED + self.home_speed_l = HOME_SPEED_L + self.move_acceleration_x = MOVE_ACCELERATION_X + self.move_acceleration_z = MOVE_ACCELERATION_Z + self.move_acceleration_l = MOVE_ACCELERATION_L + self.max_speed_discontinuity_x = MAX_SPEED_DISCONTINUITY_X + self.max_speed_discontinuity_z = MAX_SPEED_DISCONTINUITY_Z + self.max_speed_discontinuity_l = MAX_SPEED_DISCONTINUITY_L + self.current_position = {'X': None, 'Z': None, 'L': None} + self.x_sg_value = x_sg_value + self.z_sg_value = z_sg_value + self.l_sg_value = l_sg_value + # self.__class__.__name__ == 'FlexStacker' + + @classmethod + def create(cls, port: str, baudrate: int = 115200, timeout: float = 1.0) -> "FlexStacker": + """Flex Stacker Driver""" + conn = Serial(port = port, baudrate = baudrate, timeout = timeout) + return cls(connection = conn) + + @classmethod + def create_from_sn(cls, sn: str, baudrate: int = 115200, timeout: float = 1.0) -> "FlexStacker": + """Flex Stacker Driver""" + port = None + for comport in serial.tools.list_ports.comports(): + if comport.serial_number is sn: + port = comport.device + break + if not port: + raise ValueError(f"Could not find connected stacker with serial number {sn}") + + return cls.create(port=port, baudrate=baudrate, timeout=timeout) + + def setup_stall_detection(self): + self.enable_SG(AXIS.X, self.x_sg_value, True) + self.enable_SG(AXIS.Z, self.z_sg_value, True) + self.enable_SG(AXIS.L, self.l_sg_value, True) + + def send_command( + self, command: CommandBuilder, retries: int = 0, timeout: Optional[float] = None + ) -> str: + """ + Send a command and return the response. + + Args: + command: A command builder. + retries: number of times to retry in case of timeout + timeout: optional override of default timeout in seconds + + Returns: The command response + + Raises: SerialException + """ + return self._send_data( + data=command.build(), retries=retries, timeout=DEFAULT_FS_TIMEOUT + ) + + def _send_data(self, data: str, retries: int = 0, timeout: Optional[float] = None) -> str: + """ + Send data and return the response. + + Args: + data: The data to send. + retries: number of times to retry in case of timeout + + Returns: The command response + + Raises: SerialException + """ + data_encode = data.encode() + self._stacker_connection.write(data=data_encode) + start = time.time() + while True: + response = self._stacker_connection.readline() + print(response) + #if (self._ack in response) or (self._stall in response): + if (self._ack in response): + # Remove ack from response + response = response.replace(self._ack, b"OK\n") + str_response = self.process_raw_response( + command=data, response=response.decode() + ) + return str_response + elif (self._stall in response): + # Remove ack from response + str_response = self.process_raw_response( + command=data, response=response.decode() + ) + print(str_response) + return str_response + end = time.time() + if (end-start) > 120: + str_response = b"OK\n" + return str_response + + self.on_retry() + + def on_retry(self) -> None: + """ + Opportunity for derived classes to perform action between retries. Default + behaviour is to wait then re-open the connection. + + Returns: None + """ + time.sleep(DEFAULT_FS_TIMEOUT) + self._stacker_connection.close() + self._stacker_connection.open() + + def process_raw_response(self, command: str, response: str) -> str: + """ + Opportunity for derived classes to process the raw response. Default + strips white space. + + Args: + command: The sent command. + response: The raw read response minus ack. + + Returns: + processed response. + """ + return response.strip() + + def is_simulator(self)-> bool: + """Is Simulator""" + return False + + def connect(self) -> None: + """Check connection""" + self._stacker_connection.open() + + def disconnect(self) -> None: + """Disconnect from Flex Stacker""" + self._stacker_connection.close() + + def get_device_info(self) -> str: + """Get the serial number of the flex stacker unit""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.DEVICE_INFO) + print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + return response + + def get_device_firmware_version(self) -> str: + """Get the firmware version of the flex stacker unit""" + firmware_version = self.get_device_info().split()[1].split(':')[1] + return firmware_version + + def get_device_serial_number(self) -> str: + """Get the serial number of the flex stacker unit""" + serial_number = self.get_device_info().split()[3].split(':')[1] + return serial_number + + def set_device_serial_number(self, serial_number) -> None: + """Set the serial number of the flex stacker unit""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.SET_SERIAL_NUM).add_element(serial_number) + print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def enable_motor(self, axis: AXIS): + """Enables a Axis motor + Args: + command: Axis + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.ENABLE_MOTOR + ).add_element(axis.upper()) + print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def disable_motor(self, axis: AXIS): + """Enables a Axis motor + Args: + command: Axis + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.DISABLE_MOTOR + ).add_element(axis.upper()) + print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def get_sensor_states(self): + """Returns the limit switch status""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.LIMITSWITCH_STATUS + ) + # print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + return self.sensor_parse(response) + + def get_platform_sensor_states(self) -> str: + """Returns the limit switch status""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.PLATFORM_STATUS + ) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + return self.sensor_parse(response) + + def get_settings(self) -> str: + """Not Implemented yet""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.READ_SET_SETTINGS + ) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('CMD: rrr') + + return response + + def sensor_parse(self, cmd): + """ + The response of limit switch command returns a string that needs to be parse. + Below is the following example of the response from the firmware. + + Example: 2024-09-03 14:58:03.135606 Rx <== M119 XE:0 XR:0 ZE:0 ZR:0 LR:0 LH:1 OK + """ + + punctuation = [':'] + i_tracker = 0 + switch_state = [] + final = [] + # print(cmd.index(GCODE.LIMITSWITCH_STATUS)) + for i in cmd: + if i in punctuation: + switch_state.append(i_tracker) + if len(switch_state) == 1: + lsw = cmd[switch_state[0]+1:switch_state[0]+2] + final.append(lsw) + switch_state = [] + i_tracker += 1 + # print(final) + if GCODE.LIMITSWITCH_STATUS in cmd: + final = self._parse_lsw(final) + elif GCODE.PLATFORM_STATUS in cmd: + final = self._parse_plat(final) + return final + + def _parse_lsw(self, parse_data): + """LSW->X+:0,X-:0,Z+:0,Z-:1,PR:1,PH:1PS->X+1,X-:0""" + "2024-09-03 14:58:03.135606 Rx <== M119 XE:0 XR:0 ZE:0 ZR:0 LR:0 LH:1 OK" + states = {} + states.update({"XE": parse_data[0], + "XR": parse_data[1], + "ZE": parse_data[2], + "ZR": parse_data[3], + "LR": parse_data[4] + }) + return states + + def _parse_plat(self, parse_data): + """LSW->X+:0,X-:0,Z+:0,Z-:1,PR:1,PH:1PS->X+1,X-:0""" + "2024-09-03 14:58:03.135606 Rx <== M119 XE:0 XR:0 ZE:0 ZR:0 LR:0 LH:1 OK" + states = {} + + states.update({"E": parse_data[0], + "R": parse_data[1], + }) + return states + + def set_default(self, value, default): + return value if value is not None else default + + def get_current_motion_params(self, axis)-> str: + """Read current motion parameters""" + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.CURRENT_MOTION_PARMS + ).add_element(axis) + print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('CMD: rrr') + return response + + def move(self, axis: AXIS, distance: float, direction: DIR, + velocity: Optional[float] = None, acceleration: Optional[float] = None, + msd: Optional[float] = None, + current: Optional[float] = None): + if axis == AXIS.X: + current = self.set_default(current, MOVE_CURRENT_X) + self.set_run_current(current, AXIS.X) + velocity = self.set_default(velocity, MOVE_SPEED_X) + acceleration = self.set_default(acceleration, MOVE_ACCELERATION_X) + msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_X) + elif axis == AXIS.Z: + current = self.set_default(current, MOVE_CURRENT_Z) + self.set_run_current(current, AXIS.Z) + velocity = self.set_default(velocity, MOVE_SPEED_DOWNZ) + acceleration = self.set_default(acceleration, MOVE_ACCELERATION_Z) + msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_Z) + elif axis == AXIS.L: + current = self.set_default(current, MOVE_CURRENT_L) + self.set_run_current(current, AXIS.L) + velocity = self.set_default(velocity, MOVE_SPEED_L) + acceleration = self.set_default(acceleration, MOVE_ACCELERATION_L) + msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_L) + else: + raise(f"AXIS not defined!! {axis}") + # if self.current_position['X'] == None or self.current_position['Z'] == None: + # raise(f"Motor must be Home{axis}") + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.MOVE_DIST).add_element( + axis.upper() + + f'{direction}' + + f'{distance}').add_element( + f'V{velocity}' + ).add_element( + f'A{acceleration}' #) + ).add_element( + f'D{msd}') + + #print(c) + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + stall_detection = "async ERR403:motor stall error" + if response == stall_detection: + raise(f"Stall Detected on {axis}") + if direction == DIR.POSITIVE and axis == AXIS.X: + self.current_position.update({'X': self.current_position['X'] + distance}) + elif direction == DIR.NEGATIVE and axis == AXIS.X: + self.current_position.update({'X': self.current_position['X'] - distance}) + elif direction == DIR.POSITIVE and axis == AXIS.Z: + self.current_position.update({'Z': self.current_position['Z'] + distance}) + elif direction == DIR.NEGATIVE and axis == AXIS.Z: + self.current_position.update({'Z': self.current_position['Z'] - distance}) + elif direction == DIR.POSITIVE and axis == AXIS.L: + self.current_position.update({'L': self.current_position['L'] + distance}) + elif direction == DIR.NEGATIVE and axis == AXIS.L: + self.current_position.update({'L': self.current_position['L'] - distance}) + else: + raise(f"Not recognized {axis} and {direction}") + #print(self.current_position) + + + def microstepping_move(self, axis: AXIS, distance: float, direction: DIR, speed: float, acceleration: float): + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_element( + axis.upper()).add_element(f'{direction}').add_gcode( + gcode=GCODE.MOVE_IGNORE_LIMIT + ) + self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + + def set_microstep(self, axis: AXIS, microstepping: int): + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.SET_MICROSTEPPING).add_element(f'{axis.upper()}{microstepping}') + #print(c) + self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + + def home(self, axis: AXIS, direction: DIR, velocity: Optional[float] = None, + acceleration: Optional[float] = None, + current: Optional[float] = None): + # Set this to max current to overcome spring force on platforms + if axis == AXIS.X: + current = self.set_default(current, HOME_CURRENT_X) + #print(f"current set: {current}") + self.set_run_current(current, AXIS.X) + velocity = self.set_default(velocity, self.home_speed) + acceleration = self.set_default(acceleration, self.home_acceleration) + # msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_X) + elif axis == AXIS.Z: + current = self.set_default(current, HOME_CURRENT_Z) + self.set_run_current(current, AXIS.Z) + velocity = self.set_default(velocity, self.home_speed) + acceleration = self.set_default(acceleration, self.home_acceleration) + self.set_ihold_current(1.8, AXIS.Z) + # msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_Z) + elif axis == AXIS.L: + current = self.set_default(current, HOME_CURRENT_L) + self.set_run_current(current, AXIS.L) + velocity = self.set_default(velocity, self.home_speed_l) + acceleration = self.set_default(acceleration, self.home_acceleration_l) + #print(velocity) + #print(acceleration) + # msd = self.set_default(msd, MAX_SPEED_DISCONTINUITY_L) + else: + raise(f"AXIS not defined!! {axis}") + # G5 X[dir: 0|1] V100 A50 + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.MOVE_LS + ).add_element( + axis.upper() + + direction).add_element( + f'V{velocity}' + ).add_element( + f'A{acceleration}' + ) + #print(c) + self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + if direction == DIR.POSITIVE_HOME and axis == AXIS.X: + self.current_position.update({'X': TOTAL_TRAVEL_X}) + elif direction == DIR.NEGATIVE_HOME and axis == AXIS.X: + self.current_position.update({'X': 0}) + elif direction == DIR.POSITIVE_HOME and axis == AXIS.Z: + self.current_position.update({'Z': TOTAL_TRAVEL_Z}) + elif direction == DIR.NEGATIVE_HOME and axis == AXIS.Z: + self.current_position.update({'Z': 0}) + elif direction == DIR.NEGATIVE_HOME and axis == AXIS.L: + self.current_position.update({'L': 0}) + else: + raise(f"Not recognized {axis} and {direction}") + #print(self.current_position) + + def convert_current_to_binary(self, current: float) -> bin: + # fixed_point_constant = 1398894 + fixed_point_constant = 1419610 + if current > 1.5: + current = 1.5 + message_current = int((current)*2**16) + print(message_current) + shifted_current_cs = fixed_point_constant*message_current + print(shifted_current_cs) + current_cs = (shifted_current_cs >> 32) - 1 + print(current_cs) + if current_cs > 31: + current_cs = 31 + current = '0b'+ str(bin(current_cs).replace("0b", '')).zfill(5) + return current + + def set_ihold_current(self, current: float, axis: AXIS) -> str: + """ + M907 - Set axis hold current in Amps ex: M907 X0.5 + M909 - Set microstepping using power of 2 ex: M90 Z2 = 2^2 microstepping""" + # current = self.convert_current_to_binary(current) + # print(current) + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.SET_IHOLD_CURRENT + ).add_element(axis + f'{current}') + #print(c) + self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + + def set_run_current(self, current: float, axis: AXIS) -> str: + """ M906 - Set axis peak run current in Amps ex: M906 X1.5""" + # current = convert_current_to_binary(current) + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.SET_PEAK_CURRENT + ).add_element(axis + f'{current}') + print(c) + self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES) + time.sleep(0.1) + + def close_latch(self, velocity: Optional[float] = None, acceleration: Optional[float] = None): + velocity = self.set_default(velocity, MOVE_SPEED_L) + acceleration = self.set_default(acceleration, HOME_ACCELERATION_L) + states = self.get_sensor_states() + #print(states) + #print(velocity) + #print(acceleration) + cur_position = self.current_position['L'] + #print(cur_position) + # self.home(AXIS.L, DIR.NEGATIVE_HOME, velocity, acceleration) + if cur_position == None: + self.home(AXIS.L, DIR.NEGATIVE_HOME, velocity, acceleration) + elif cur_position != 0: + self.move(AXIS.L, TOTAL_TRAVEL_L-2, DIR.NEGATIVE, MOVE_SPEED_L, MOVE_ACCELERATION_L, MAX_SPEED_DISCONTINUITY_L) + self.home(AXIS.L, DIR.NEGATIVE_HOME, velocity, acceleration) + else: + self.home(AXIS.L, DIR.NEGATIVE_HOME, velocity, acceleration) + + def open_latch(self, distance: Optional[float] = None, + velocity: Optional[float] = None, acceleration: Optional[float] = None, + max_speed_discontinuity: Optional[float] = None): + # distance = self.set_default(distance, LATCH_DISTANCE_MM) + velocity = self.set_default(velocity, MOVE_SPEED_L) + acceleration = self.set_default(acceleration, MOVE_ACCELERATION_L) + msd = self.set_default(max_speed_discontinuity, MAX_SPEED_DISCONTINUITY_L) + self.move(AXIS.L, TOTAL_TRAVEL_L, DIR.POSITIVE, velocity, acceleration, msd) + + def load_labware(self, labware_height: float): + # ----------------Set up the Stacker------------------------ + self.home(AXIS.X, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.home(AXIS.Z, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.close_latch() + self.move(AXIS.X, TOTAL_TRAVEL_X-5, DIR.NEGATIVE, self.move_speed_x, self.move_acceleration_x) + self.home(AXIS.X, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.move(AXIS.Z, TOTAL_TRAVEL_Z-(labware_height/2)-10, DIR.POSITIVE, self.move_speed_up_z/4, self.move_acceleration_z) + # #------------------- transfer ----------------------------- + self.open_latch() + self.move(AXIS.Z, (labware_height/2), DIR.POSITIVE, self.move_speed_up_z/2, self.move_acceleration_z) + self.home(AXIS.Z, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.close_latch() + self.move(AXIS.Z, TOTAL_TRAVEL_Z-15, DIR.NEGATIVE, self.move_speed_down_z, self.move_acceleration_z) + self.home(AXIS.Z, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.move(AXIS.X, TOTAL_TRAVEL_X-5, DIR.POSITIVE, self.move_speed_x, self.move_acceleration_x) + self.home(AXIS.X, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + + def unload_labware(self, labware_height: float): + axis_swap_approach_mm = 10 + # ----------------Set up the Stacker------------------------ + self.home(AXIS.X, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.home(AXIS.Z, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.close_latch() + self.move(AXIS.X, TOTAL_TRAVEL_X-5, DIR.NEGATIVE, self.move_speed_x, self.move_acceleration_x) + self.home(AXIS.X, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.move(AXIS.Z, TOTAL_TRAVEL_Z-5, DIR.POSITIVE, self.move_speed_up_z, self.move_acceleration_z) + self.home(AXIS.Z, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + # #------------------- transfer ----------------------------- + self.open_latch() + self.move(AXIS.Z, (labware_height/2), DIR.NEGATIVE, self.move_speed_down_z, self.move_acceleration_z) + self.close_latch() + self.move(AXIS.Z, TOTAL_TRAVEL_Z-(labware_height/2+axis_swap_approach_mm), DIR.NEGATIVE, self.move_speed_down_z, self.move_acceleration_z) + self.home(AXIS.Z, DIR.NEGATIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + self.move(AXIS.X, TOTAL_TRAVEL_X-5, DIR.POSITIVE, self.move_speed_x, self.move_acceleration_x) + self.home(AXIS.X, DIR.POSITIVE_HOME, HOME_SPEED, HOME_ACCELERATION) + + def write_to_motor_drive(self, register, data): + """ + Example: M921 X32528 103689 + Write Motor Driver Register + + input: the axis, the address, and the contents of the register you want to read + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.WRITE_TO_REGISTER + ).add_element(axis.upper()).add_element(register).add_element(data) + #print(c) + + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def read_from_motor_drive(self, register): + """ + Example: M921 X32528 103689 + Write Motor Driver Register + + input: the axis, the address, and the contents of the register you want to read + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.WRITE_TO_REGISTER + ).add_element(axis.upper()).add_element(register) + print(c) + + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def enable_SG(self, axis: AXIS, sg_value: int, enable: bool): + """ + Enable StallGuard and set SGT + + *T: StallGuard Threshold (SGT), (-64-63) + + *: optional params, default/previously set values will be used if + unspecified. You can query the current SGT value with M911 Z + M910 Z[enable: 0|1] T2 + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.STALLGUARD + ).add_element(f'{axis.upper()}{int(enable)}').add_element(f'T{sg_value}') + #print(c) + + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') + + def read_SG_value(self, axis: AXIS): + """ + Get the StallGuard (SGT) value + + M911 Z1 T2 + """ + c = CommandBuilder(terminator=FS_COMMAND_TERMINATOR).add_gcode( + gcode=GCODE.GET_STALLGUARD_VAL + ).add_element(axis.upper()) + #print(c) + + response = self.send_command(command=c, retries=DEFAULT_COMMAND_RETRIES).strip('OK') diff --git a/api/src/opentrons/drivers/stacker/slas_demo.py b/api/src/opentrons/drivers/stacker/slas_demo.py new file mode 100644 index 00000000000..3663e5c84cd --- /dev/null +++ b/api/src/opentrons/drivers/stacker/slas_demo.py @@ -0,0 +1,121 @@ +from opentrons.protocol_api import ProtocolContext, Labware +from opentrons.drivers.stacker.flex_stacker_driver import FlexStacker, LABWARE_Z_HEIGHT +from typing import Optional, List + +metadata = {"protocolName": "Flex Stacker SLAS Demo"} +requirements = {"robotType": "Flex", "apiLevel": "2.17"} + +CYCLES = 1000 +STACKER_HEIGHT = 30 +TRASH_HEIGHT = 50 +SEAL_HEIGHT = 9 +LID_FIXTURE_HEIGHT = 1 + + +class StackerModule: + stacker: Optional[FlexStacker] + STACKER_GRIPPER_OFFSET = {"x": -3, "y": 0, "z": 12} + LABWARE_Z_OFFSET = { + "opentrons_flex_96_tiprack_1000ul": LABWARE_Z_HEIGHT.OPENTRONS_TIPRACKS, + "opentrons_96_wellplate_200ul_pcr_full_skirt": LABWARE_Z_HEIGHT.BIORAD_HARDSHELL_PCR, + } + + def __init__( + self, serial_number: str, labware_name: str, slot: str, protocol: ProtocolContext + ): + self.stacker = ( + FlexStacker.create_from_sn(serial_number) if not protocol.is_simulating() else None + ) + self.labware_name = labware_name + self.slot = slot + self.protocol = protocol + + def unload_and_move_labware(self, new_location: Labware | str) -> Labware: + lw = self.protocol.load_labware(self.labware_name, self.slot) + if self.stacker: + self.stacker.unload_labware(self.LABWARE_Z_OFFSET[self.labware_name]) + self.protocol.move_labware( + lw, + new_location, + use_gripper=True, + pick_up_offset=self.STACKER_GRIPPER_OFFSET, + ) + del self.protocol.deck[self.slot] + return lw + + def move_and_store_labware(self, lw: Labware) -> None: + self.protocol.move_labware( + lw, self.slot, use_gripper=True, drop_offset=self.STACKER_GRIPPER_OFFSET + ) + if self.stacker: + self.stacker.load_labware( + self.LABWARE_Z_OFFSET[self.labware_name] + ) + del self.protocol.deck[self.slot] + + +def run(protocol: ProtocolContext) -> None: + hardware = protocol._hw_manager.hardware + hardware.cache_instruments() + + tiprack_stacker = StackerModule( + serial_number="PS241204SZEVT01", + labware_name="opentrons_flex_96_tiprack_1000ul", + slot="C4", + protocol=protocol, + ) + plate_stacker = StackerModule( + serial_number="PS241204SZEVT01", + labware_name="opentrons_96_wellplate_200ul_pcr_full_skirt", + slot="D4", + protocol=protocol, + ) + + # # Thermocycler in A1 + thermocycler = protocol.load_module("thermocyclerModuleV2") + thermocycler.open_lid() + + # Tiprack adapaters in A2, A3, B2 + tiprack_adapters = [ + protocol.load_adapter("opentrons_flex_96_tiprack_adapter", slot) + for slot in ["A2", "A3", "B2"] + ] + # Dispoable lid stack on Deck Riser in C1 + deck_riser = protocol.load_adapter("opentrons_flex_deck_riser", "C1") + lids: List[Labware] = [ + deck_riser.load_labware("opentrons_tough_pcr_auto_sealing_lid") + ] + for _ in range(3): + lids.append(lids[-1].load_labware("opentrons_tough_pcr_auto_sealing_lid")) + + # MagBlock in D2 + magnetic_block = protocol.load_module("magneticBlockV1", "D2") + + """Unload and move tipracks""" + tipracks: List[Labware] = [] + for adapter in tiprack_adapters: + tiprack = tiprack_stacker.unload_and_move_labware(adapter) + tipracks.append(tiprack) + + """Unload and move plate""" + plates: List[Labware] = [] + plate_dests = ["C2", magnetic_block, thermocycler] + for dest in plate_dests: + plate = plate_stacker.unload_and_move_labware(dest) + plates.append(plate) + + """Move disposable lid on thermocycler plate""" + protocol.move_labware(lids[-1], plates[-1], use_gripper=True) + + """Remove disposable lid from thermocycler plate""" + protocol.move_labware(lids[-1], lids[-2], use_gripper=True) + + """Store plates in stacker""" + plates.reverse() + for plate in plates: + plate_stacker.move_and_store_labware(plate) + + """Store tipracks in stacker""" + tipracks.reverse() + for tiprack in tipracks: + tiprack_stacker.move_and_store_labware(tiprack)