diff --git a/docs/Config_Reference.md b/docs/Config_Reference.md index e64eeee78..b825dbd75 100644 --- a/docs/Config_Reference.md +++ b/docs/Config_Reference.md @@ -4835,6 +4835,7 @@ run_current: #driver_SEDN: 0 #driver_SEIMIN: 0 #driver_SFILT: 0 +#driver_SG4_THRS: 0 #driver_SG4_ANGLE_OFFSET: 1 # Set the given register during the configuration of the TMC2240 # chip. This may be used to set custom motor parameters. The @@ -4847,8 +4848,8 @@ run_current: # is "active low" and is thus normally prefaced with "^!". Setting # this creates a "tmc2240_stepper_x:virtual_endstop" virtual pin # which may be used as the stepper's endstop_pin. Doing this enables -# "sensorless homing". (Be sure to also set driver_SGT to an -# appropriate sensitivity value.) The default is to not enable +# "sensorless homing". (Be sure to also set driver_SGT OR driver_SG4_THRS +# to an appropriate sensitivity value.) The default is to not enable # sensorless homing. ``` diff --git a/klippy/extras/homing.py b/klippy/extras/homing.py index 233ad2f42..c51ef7d16 100644 --- a/klippy/extras/homing.py +++ b/klippy/extras/homing.py @@ -325,6 +325,7 @@ def home_rails(self, rails, forcepos, movepos): hmove.homing_move(homepos, hi.speed) finally: self._set_homing_accel(hi.accel, pre_homing=False) + self._set_homing_current(homing_axes, pre_homing=False) needs_rehome = False retract_dist = hi.retract_dist diff --git a/klippy/extras/tmc.py b/klippy/extras/tmc.py index 0219a2bc4..fcf074646 100644 --- a/klippy/extras/tmc.py +++ b/klippy/extras/tmc.py @@ -8,6 +8,8 @@ from klippy import stepper +from . import bulk_sensor + ###################################################################### # Field helpers ###################################################################### @@ -262,6 +264,103 @@ def get_status(self, eventtime=None): return {"drv_status": self.last_drv_fields, "temperature": temp} +###################################################################### +# Record driver status +###################################################################### + + +class TMCStallguardDump: + def __init__(self, config, mcu_tmc): + self.printer = config.get_printer() + self.stepper_name = " ".join(config.get_name().split()[1:]) + self.mcu_tmc = mcu_tmc + self.mcu = self.mcu_tmc.get_mcu() + self.fields = self.mcu_tmc.get_fields() + self.sg2_supp = False + self.sg4_reg_name = None + # It is possible to support TMC2660, just disable it for now + if not self.fields.all_fields.get("DRV_STATUS", None): + return + # Collect driver capabilities + if self.fields.all_fields["DRV_STATUS"].get("sg_result", None): + self.sg2_supp = True + # New drivers have separate register for SG4 result + if self.mcu_tmc.name_to_reg.get("SG_RESULT", 0): + self.sg4_reg_name = "SG_RESULT" + # 2240 supports both SG2 & SG4 + if self.sg4_reg_name is None: + if self.mcu_tmc.name_to_reg.get("SG4_RESULT", 0): + self.sg4_reg_name = "SG4_RESULT" + # TMC2208 + if self.sg2_supp is None and self.sg4_reg_name is None: + return + self.optimized_spi = False + # Bulk API + self.samples = [] + self.query_timer = None + self.error = None + self.batch_bulk = bulk_sensor.BatchBulkHelper( + self.printer, self._dump, self._start, self._stop + ) + api_resp = {"header": ("time", "sg_result", "cs_actual")} + self.batch_bulk.add_mux_endpoint( + "tmc/stallguard_dump", "name", self.stepper_name, api_resp + ) + + def _start(self): + self.error = None + status = self.mcu_tmc.get_register_raw("DRV_STATUS") + if status.get("spi_status"): + self.optimized_spi = True + reactor = self.printer.get_reactor() + self.query_timer = reactor.register_timer(self._query_tmc, reactor.NOW) + + def _stop(self): + self.printer.get_reactor().unregister_timer(self.query_timer) + self.query_timer = None + self.samples = [] + + def _query_tmc(self, eventtime): + sg_result = -1 + cs_actual = -1 + recv_time = eventtime + try: + if self.optimized_spi or self.sg4_reg_name == "SG4_RESULT": + # TMC2130/TMC5160/TMC2240 + status = self.mcu_tmc.get_register_raw("DRV_STATUS") + reg_val = status["data"] + cs_actual = self.fields.get_field("cs_actual", reg_val) + sg_result = self.fields.get_field("sg_result", reg_val) + is_stealth = self.fields.get_field("stealth", reg_val) + recv_time = status["#receive_time"] + if is_stealth and self.sg4_reg_name == "SG4_RESULT": + sg4_ret = self.mcu_tmc.get_register_raw("SG4_RESULT") + sg_result = sg4_ret["data"] + recv_time = sg4_ret["#receive_time"] + else: + # TMC2209 + if self.sg4_reg_name == "SG_RESULT": + sg4_ret = self.mcu_tmc.get_register_raw("SG_RESULT") + sg_result = sg4_ret["data"] + recv_time = sg4_ret["#receive_time"] + except self.printer.command_error as e: + self.error = e + return self.printer.get_reactor().NEVER + print_time = self.mcu.estimated_print_time(recv_time) + self.samples.append((print_time, sg_result, cs_actual)) + if self.optimized_spi: + return eventtime + 0.001 + # UART queried as fast as possible + return eventtime + 0.005 + + def _dump(self, eventtime): + if self.error: + raise self.error + samples = self.samples + self.samples = [] + return {"data": samples} + + ###################################################################### # G-Code command helpers ###################################################################### @@ -274,18 +373,27 @@ def __init__(self, config, mcu_tmc, current_helper): self.name = config.get_name().split()[-1] self.mcu_tmc = mcu_tmc self.current_helper = current_helper - self.echeck_helper = TMCErrorCheck(config, mcu_tmc) self.fields = mcu_tmc.get_fields() - self.read_registers = self.read_translate = None - self.toff = None - self.mcu_phase_offset = None self.stepper = None + # Stepper phase tracking + self.mcu_phase_offset = None + # Stepper enable/disable tracking + self.toff = None self.stepper_enable = self.printer.load_object(config, "stepper_enable") + self.enable_mutex = self.printer.get_reactor().mutex() + # DUMP_TMC support + self.read_registers = self.read_translate = None + # Common tmc helpers + self.echeck_helper = TMCErrorCheck(config, mcu_tmc) + self.record_helper = TMCStallguardDump(config, mcu_tmc) + # Set microstep config options + TMCMicrostepHelper(config, mcu_tmc) + # Register callbacks self.printer.register_event_handler( "stepper:sync_mcu_position", self._handle_sync_mcu_pos ) self.printer.register_event_handler( - "stepper:set_sdir_inverted", self._handle_sync_mcu_pos + "stepper:set_dir_inverted", self._handle_sync_mcu_pos ) self.printer.register_event_handler( "klippy:mcu_identify", self._handle_mcu_identify @@ -293,8 +401,6 @@ def __init__(self, config, mcu_tmc, current_helper): self.printer.register_event_handler( "klippy:connect", self._handle_connect ) - # Set microstep config options - TMCMicrostepHelper(config, mcu_tmc) # Register commands gcode = self.printer.lookup_object("gcode") gcode.register_mux_command( @@ -456,62 +562,56 @@ def _handle_sync_mcu_pos(self, stepper): # Stepper enable/disable tracking def _do_enable(self, print_time): - try: - if self.toff is not None: - # Shared enable via comms handling - self.fields.set_field("toff", self.toff) - self._init_registers() - did_reset = self.echeck_helper.start_checks() - if did_reset: - self.mcu_phase_offset = None - # Calculate phase offset + if self.toff is not None: + # Shared enable via comms handling + self.fields.set_field("toff", self.toff) + self._init_registers() + did_reset = self.echeck_helper.start_checks() + if did_reset: + self.mcu_phase_offset = None + # Calculate phase offset + if self.mcu_phase_offset is not None: + return + gcode = self.printer.lookup_object("gcode") + with gcode.get_mutex(): if self.mcu_phase_offset is not None: return - gcode = self.printer.lookup_object("gcode") - with gcode.get_mutex(): - if self.mcu_phase_offset is not None: - return - logging.info( - "Pausing toolhead to calculate %s phase offset", - self.stepper_name, - ) - self.printer.lookup_object("toolhead").wait_moves() - self._handle_sync_mcu_pos(self.stepper) - except self.printer.command_error as e: - self.printer.invoke_shutdown(str(e)) + logging.info( + "Pausing toolhead to calculate %s phase offset", + self.stepper_name, + ) + self.printer.lookup_object("toolhead").wait_moves() + self._handle_sync_mcu_pos(self.stepper) def _do_disable(self, print_time): - try: - if self.toff is not None: - val = self.fields.set_field("toff", 0) - reg_name = self.fields.lookup_register("toff") - self.mcu_tmc.set_register(reg_name, val, print_time) - self.echeck_helper.stop_checks() - except self.printer.command_error as e: - self.printer.invoke_shutdown(str(e)) + if self.toff is not None: + val = self.fields.set_field("toff", 0) + reg_name = self.fields.lookup_register("toff") + self.mcu_tmc.set_register(reg_name, val, print_time) + self.echeck_helper.stop_checks() + def _handle_stepper_enable(self, print_time, is_enable): + def enable_disable_cb(eventtime): + try: + with self.enable_mutex: + if is_enable: + self._do_enable(print_time) + else: + self._do_disable(print_time) + except self.printer.command_error as e: + self.printer.invoke_shutdown(str(e)) + + self.printer.get_reactor().register_callback(enable_disable_cb) + + # Initial startup handling def _handle_mcu_identify(self): # Lookup stepper object force_move = self.printer.lookup_object("force_move") self.stepper = force_move.lookup_stepper(self.stepper_name) self.stepper.set_tmc_current_helper(self.current_helper) - # Note pulse duration and step_both_edge optimizations available self.stepper.setup_default_pulse_duration(0.000000100, True) - def _handle_stepper_enable(self, print_time, is_enable): - if is_enable: - - def cb(ev): - return self._do_enable(print_time) - - else: - - def cb(ev): - return self._do_disable(print_time) - - self.printer.get_reactor().register_callback(cb) - def _handle_connect(self): # Check if using step on both edges optimization pulse_duration, step_both_edge = self.stepper.get_pulse_duration() @@ -621,8 +721,8 @@ def __init__(self, config, mcu_tmc): self.diag_pin = config.get("diag_pin", None) self.diag_pin_field = None self.mcu_endstop = None - self.en_pwm = False - self.pwmthrs = self.coolthrs = self.thigh = 0 + self._dirty_regs = collections.OrderedDict() + self._prev_state = collections.OrderedDict() # Register virtual_endstop pin name_parts = config.get_name().split() ppins = self.printer.lookup_object("pins") @@ -647,57 +747,54 @@ def setup_pin(self, pin_type, pin_params): self.mcu_endstop = ppins.setup_pin("endstop", self.diag_pin) return self.mcu_endstop + def _set_field(self, field_name, value): + self._prev_state[field_name] = self.fields.get_field(field_name) + reg_name = self.fields.lookup_register(field_name) + self._dirty_regs[reg_name] = self.fields.set_field(field_name, value) + + def _send_fields(self): + for reg, val in self._dirty_regs.items(): + self.mcu_tmc.set_register(reg, val) + self._dirty_regs.clear() + def handle_homing_move_begin(self, hmove): if self.mcu_endstop not in hmove.get_mcu_endstops(): return + sg4_thrs = 0 + if self.fields.lookup_register("sg4_thrs", None) is not None: + sg4_thrs = self.fields.get_field("sg4_thrs") # Enable/disable stealthchop - self.pwmthrs = self.fields.get_field("tpwmthrs") reg = self.fields.lookup_register("en_pwm_mode", None) if reg is None: # On "stallguard4" drivers, "stealthchop" must be enabled - self.en_pwm = not self.fields.get_field("en_spreadcycle") - tp_val = self.fields.set_field("tpwmthrs", 0) - self.mcu_tmc.set_register("TPWMTHRS", tp_val) - val = self.fields.set_field("en_spreadcycle", 0) + self._set_field("tpwmthrs", 0) + self._set_field("en_spreadcycle", 0) + elif sg4_thrs: + # TMC2240 using SG4, "stealthchop" must be enabled + self._set_field("en_pwm_mode", 1) + self._set_field("tpwmthrs", 0) + self._set_field(self.diag_pin_field, 1) else: # On earlier drivers, "stealthchop" must be disabled - self.en_pwm = self.fields.get_field("en_pwm_mode") - self.fields.set_field("en_pwm_mode", 0) - val = self.fields.set_field(self.diag_pin_field, 1) - self.mcu_tmc.set_register("GCONF", val) + self._set_field("en_pwm_mode", 0) + self._set_field(self.diag_pin_field, 1) # Enable tcoolthrs (if not already) - self.coolthrs = self.fields.get_field("tcoolthrs") - if self.coolthrs == 0: - tc_val = self.fields.set_field("tcoolthrs", 0xFFFFF) - self.mcu_tmc.set_register("TCOOLTHRS", tc_val) + if self.fields.get_field("tcoolthrs") == 0: + self._set_field("tcoolthrs", 0xFFFFF) # Disable thigh reg = self.fields.lookup_register("thigh", None) if reg is not None: - self.thigh = self.fields.get_field("thigh") - th_val = self.fields.set_field("thigh", 0) - self.mcu_tmc.set_register(reg, th_val) + self._set_field("thigh", 0) + self._send_fields() def handle_homing_move_end(self, hmove): if self.mcu_endstop not in hmove.get_mcu_endstops(): return - # Restore stealthchop/spreadcycle - reg = self.fields.lookup_register("en_pwm_mode", None) - if reg is None: - tp_val = self.fields.set_field("tpwmthrs", self.pwmthrs) - self.mcu_tmc.set_register("TPWMTHRS", tp_val) - val = self.fields.set_field("en_spreadcycle", not self.en_pwm) - else: - self.fields.set_field("en_pwm_mode", self.en_pwm) - val = self.fields.set_field(self.diag_pin_field, 0) - self.mcu_tmc.set_register("GCONF", val) - # Restore tcoolthrs - tc_val = self.fields.set_field("tcoolthrs", self.coolthrs) - self.mcu_tmc.set_register("TCOOLTHRS", tc_val) - # Restore thigh - reg = self.fields.lookup_register("thigh", None) - if reg is not None: - th_val = self.fields.set_field("thigh", self.thigh) - self.mcu_tmc.set_register(reg, th_val) + # Restore previous state + for field, val in list(self._prev_state.items()): + self._set_field(field, val) + self._send_fields() + self._prev_state.clear() ###################################################################### diff --git a/klippy/extras/tmc2130.py b/klippy/extras/tmc2130.py index 76009ff56..ae23335aa 100644 --- a/klippy/extras/tmc2130.py +++ b/klippy/extras/tmc2130.py @@ -296,14 +296,22 @@ def reg_read(self, reg, chain_pos): cmd = self._build_cmd([reg, 0x00, 0x00, 0x00, 0x00], chain_pos) self.spi.spi_send(cmd) if self.printer.get_start_args().get("debugoutput") is not None: - return 0 + return { + "spi_status": 0, + "data": 0, + "#receive_time": 0.0, + } params = self.spi.spi_transfer(cmd) pr = bytearray(params["response"]) pr = pr[ (self.chain_len - chain_pos) * 5 : (self.chain_len - chain_pos + 1) * 5 ] - return (pr[1] << 24) | (pr[2] << 16) | (pr[3] << 8) | pr[4] + return { + "spi_status": pr[0], + "data": (pr[1] << 24) | (pr[2] << 16) | (pr[3] << 8) | pr[4], + "#receive_time": params["#receive_time"], + } def reg_write(self, reg, val, chain_pos, print_time=None): minclock = 0 @@ -331,6 +339,9 @@ def reg_write(self, reg, val, chain_pos, print_time=None): ] return (pr[1] << 24) | (pr[2] << 16) | (pr[3] << 8) | pr[4] + def get_mcu(self): + return self.spi.get_mcu() + # Helper to setup an spi daisy chain bus from settings in a config section def lookup_tmc_spi_chain(config): @@ -371,11 +382,22 @@ def __init__(self, config, name_to_reg, fields, tmc_frequency): def get_fields(self): return self.fields - def get_register(self, reg_name): + def get_register_raw(self, reg_name): reg = self.name_to_reg[reg_name] with self.mutex: - read = self.tmc_spi.reg_read(reg, self.chain_pos) - return read + resp = self.tmc_spi.reg_read(reg, self.chain_pos) + return resp + + def decode_spi_status(spi_status): + return { + "standstill": spi_status >> 3 & 0x1, + "sg2": spi_status >> 2 & 0x1, + "driver_error": spi_status >> 1 & 0x1, + "reset_flag": spi_status & 0x1, + } + + def get_register(self, reg_name): + return self.get_register_raw(reg_name)["data"] def set_register(self, reg_name, val, print_time=None): reg = self.name_to_reg[reg_name] @@ -391,6 +413,9 @@ def set_register(self, reg_name, val, print_time=None): def get_tmc_frequency(self): return self.tmc_frequency + def get_mcu(self): + return self.tmc_spi.get_mcu() + ###################################################################### # TMC2130 printer object diff --git a/klippy/extras/tmc2240.py b/klippy/extras/tmc2240.py index debf43f30..daf993813 100644 --- a/klippy/extras/tmc2240.py +++ b/klippy/extras/tmc2240.py @@ -70,6 +70,7 @@ "THIGH", "ADC_VSUPPLY_AIN", "ADC_TEMP", + "OTW_OV_VTH", "MSCNT", "MSCURACT", "CHOPCONF", @@ -256,6 +257,10 @@ "adc_temp": (lambda v: "0x%04x(%.1fC)" % (v, ((v - 2038) / 7.7))), "adc_vsupply": (lambda v: "0x%04x(%.3fV)" % (v, v * 0.009732)), "adc_ain": (lambda v: "0x%04x(%.3fmV)" % (v, v * 0.3052)), + "overvoltage_vth": (lambda v: "0x%04x(%.3fV)" % (v, v * 0.009732)), + "overtempprewarning_vth": ( + lambda v: "0x%04x(%.1fC)" % (v, ((v - 2038) / 7.7)) + ), } ) @@ -451,6 +456,7 @@ def __init__(self, config): # TPOWERDOWN set_config_field(config, "tpowerdown", 10) # SG4_THRS + set_config_field(config, "sg4_thrs", 0) set_config_field(config, "sg4_angle_offset", 1) # DRV_CONF set_config_field(config, "slope_control", 0) diff --git a/klippy/extras/tmc2660.py b/klippy/extras/tmc2660.py index 5bda093a9..b98e668de 100644 --- a/klippy/extras/tmc2660.py +++ b/klippy/extras/tmc2660.py @@ -216,11 +216,14 @@ def __init__(self, config, name_to_reg, fields): def get_fields(self): return self.fields - def get_register(self, reg_name): + def get_register_raw(self, reg_name): new_rdsel = ReadRegisters.index(reg_name) reg = self.name_to_reg["DRVCONF"] if self.printer.get_start_args().get("debugoutput") is not None: - return 0 + return { + "data": 0, + "#receive_time": 0.0, + } with self.mutex: old_rdsel = self.fields.get_field("rdsel") val = self.fields.set_field("rdsel", new_rdsel) @@ -230,7 +233,13 @@ def get_register(self, reg_name): self.spi.spi_send(msg) params = self.spi.spi_transfer(msg) pr = bytearray(params["response"]) - return (pr[0] << 16) | (pr[1] << 8) | pr[2] + return { + "data": (pr[0] << 16) | (pr[1] << 8) | pr[2], + "#receive_time": params["#receive_time"], + } + + def get_register(self, reg_name): + return self.get_register_raw(reg_name)["data"] def set_register(self, reg_name, val, print_time=None): minclock = 0 @@ -244,6 +253,9 @@ def set_register(self, reg_name, val, print_time=None): def get_tmc_frequency(self): return None + def get_mcu(self): + return self.spi.get_mcu() + ###################################################################### # TMC2660 printer object diff --git a/klippy/extras/tmc_uart.py b/klippy/extras/tmc_uart.py index 0d7f8faa8..d68f71137 100644 --- a/klippy/extras/tmc_uart.py +++ b/klippy/extras/tmc_uart.py @@ -220,7 +220,10 @@ def reg_read(self, instance_id, addr, reg): self.analog_mux.activate(instance_id) msg = self._encode_read(0xF5, addr, reg) params = self.tmcuart_send_cmd.send([self.oid, msg, 10]) - return self._decode_read(reg, params["read"]) + return { + "data": self._decode_read(reg, params["read"]), + "#receive_time": params["#receive_time"], + } def reg_write(self, instance_id, addr, reg, val, print_time=None): minclock = 0 @@ -231,6 +234,9 @@ def reg_write(self, instance_id, addr, reg, val, print_time=None): msg = self._encode_write(0xF5, addr, reg | 0x80, val) self.tmcuart_send_cmd.send([self.oid, msg, 0], minclock=minclock) + def get_mcu(self): + return self.mcu + # Lookup a (possibly shared) tmc uart def lookup_tmc_uart_bitbang(config, max_addr): @@ -280,19 +286,25 @@ def get_fields(self): def _do_get_register(self, reg_name): reg = self.name_to_reg[reg_name] if self.printer.get_start_args().get("debugoutput") is not None: - return 0 + return { + "data": 0, + "#receive_time": 0.0, + } for retry in range(5): val = self.mcu_uart.reg_read(self.instance_id, self.addr, reg) - if val is not None: + if val["data"] is not None: return val raise self.printer.command_error( "Unable to read tmc uart '%s' register %s" % (self.name, reg_name) ) - def get_register(self, reg_name): + def get_register_raw(self, reg_name): with self.mutex: return self._do_get_register(reg_name) + def get_register(self, reg_name): + return self.get_register_raw(reg_name)["data"] + def set_register(self, reg_name, val, print_time=None): reg = self.name_to_reg[reg_name] if self.printer.get_start_args().get("debugoutput") is not None: @@ -301,11 +313,11 @@ def set_register(self, reg_name, val, print_time=None): for retry in range(5): ifcnt = self.ifcnt if ifcnt is None: - self.ifcnt = ifcnt = self._do_get_register("IFCNT") + self.ifcnt = ifcnt = self._do_get_register("IFCNT")["data"] self.mcu_uart.reg_write( self.instance_id, self.addr, reg, val, print_time ) - self.ifcnt = self._do_get_register("IFCNT") + self.ifcnt = self._do_get_register("IFCNT")["data"] if self.ifcnt == (ifcnt + 1) & 0xFF: return raise self.printer.command_error( @@ -314,3 +326,6 @@ def set_register(self, reg_name, val, print_time=None): def get_tmc_frequency(self): return self.tmc_frequency + + def get_mcu(self): + return self.mcu_uart.get_mcu() diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py index db7f91800..03873fcd4 100755 --- a/scripts/motan/data_logger.py +++ b/scripts/motan/data_logger.py @@ -197,6 +197,13 @@ def handle_subscribe(self, msg, raw_msg): lname = "%s:%s" % (st, aname) qcmd = "%s/dump_%s" % (st, st) self.send_subscribe(lname, qcmd, {"sensor": aname}) + if cfgname.startswith("tmc"): + driver = " ".join(cfgname.split()[1:]) + self.send_subscribe( + "stallguard:" + driver, + "tmc/stallguard_dump", + {"name": driver}, + ) def handle_dump(self, msg, raw_msg): msg_id = msg["id"] diff --git a/scripts/motan/readlog.py b/scripts/motan/readlog.py index 7a3840335..2e4cbb2c2 100644 --- a/scripts/motan/readlog.py +++ b/scripts/motan/readlog.py @@ -272,6 +272,77 @@ def _pull_block(self, req_time): LogHandlers["stepq"] = HandleStepQ +# Extract tmc currect and stallguard data from the log +class HandleStallguard: + SubscriptionIdParts = 2 + ParametersMin = 2 + ParametersMax = 2 + DataSets = [ + ( + "stallguard(,sg_result)", + "Stallguard result of the given stepper driver", + ), + ( + "stallguard(,cs_actual)", + "Current level result of the given stepper driver", + ), + ] + + def __init__(self, lmanager, name, name_parts): + self.name = name + self.stepper_name = name_parts[1] + self.filter = name_parts[2] + self.jdispatch = lmanager.get_jdispatch() + self.data = [] + self.ret = None + self.driver_name = "" + for k in lmanager.get_initial_status()["configfile"]["settings"]: + if not k.startswith("tmc"): + continue + if k.endswith(self.stepper_name): + self.driver_name = k + break + # Current decode + self.status_tracker = lmanager.get_status_tracker() + self.next_status_time = 0.0 + self.irun = 0 + + def get_label(self): + label = "%s %s %s" % ( + self.driver_name, + self.stepper_name, + self.filter, + ) + if self.filter == "sg_result": + return {"label": label, "units": "Stallguard"} + elif self.filter == "cs_actual": + return {"label": label, "units": "CS Actual"} + + # Search datapoint in dataset extrapolate in between + def pull_data(self, req_time): + while 1: + if len(self.data) == 0: + jmsg = self.jdispatch.pull_msg(req_time, self.name) + if jmsg is None: + return + self.data = jmsg["data"] + if self.ret is None and len(self.data) > 0: + time, sg_result, cs_actual = self.data.pop(0) + self.ret = { + "time": time, + "sg_result": sg_result, + "cs_actual": cs_actual, + } + if self.ret: + time = self.ret["time"] + if req_time <= time: + return self.ret[self.filter] + self.ret = None + + +LogHandlers["stallguard"] = HandleStallguard + + # Extract stepper motor phase position class HandleStepPhase: SubscriptionIdParts = 0 diff --git a/test/test_tmc_homing.py b/test/test_tmc_homing.py new file mode 100644 index 000000000..ced072924 --- /dev/null +++ b/test/test_tmc_homing.py @@ -0,0 +1,347 @@ +"""Behavior tests for TMCVirtualPinHelper and BaseTMCCurrentHelper. + +These exercise the sensorless homing save/restore contract and the Kalico +home_current current-switching logic. The goal is to lock down observable +behavior so it survives the upstream "simplify homing start/end" refactor +(Klipper PR #7193) and the TMCCommandHelper cleanup (Klipper PR #7154). +""" + +from __future__ import annotations + +import collections + +import pytest + +from klippy.extras import tmc2130, tmc2209, tmc2240 +from klippy.extras.tmc import ( + BaseTMCCurrentHelper, + FieldHelper, + TMCVirtualPinHelper, +) + + +class FakeMCUTMC: + """Captures set_register calls and returns field values on demand.""" + + def __init__(self, fields: FieldHelper): + self.fields = fields + self.writes: list[tuple[str, int]] = [] + + def get_fields(self): + return self.fields + + def set_register(self, reg_name, val, print_time=None): + self.writes.append((reg_name, val)) + # mirror the write in the register store so subsequent get_field reflects it + self.fields.registers[reg_name] = val + + +def _make_fields(driver_module) -> FieldHelper: + return FieldHelper( + driver_module.Fields, + signed_fields=getattr(driver_module, "SignedFields", []), + ) + + +def _make_vph(driver_module, diag_pin_field): + """Build a TMCVirtualPinHelper bypassing __init__ (which needs a printer). + + Initializes both the pre-refactor scalar attributes and the post-refactor + OrderedDict caches so the tests run on either implementation without + needing to be rewritten. + """ + fields = _make_fields(driver_module) + mcu_tmc = FakeMCUTMC(fields) + vph = object.__new__(TMCVirtualPinHelper) + vph.mcu_tmc = mcu_tmc + vph.fields = fields + vph.diag_pin = "PA1" + vph.diag_pin_field = diag_pin_field + # mcu_endstop is compared with identity to hmove.get_mcu_endstops() + vph.mcu_endstop = object() + # pre-refactor attributes (ignored by the new implementation) + vph.en_pwm = False + vph.pwmthrs = 0 + vph.coolthrs = 0 + vph.thigh = 0 + # post-refactor attributes (ignored by the old implementation) + vph._dirty_regs = collections.OrderedDict() + vph._prev_state = collections.OrderedDict() + return vph, fields, mcu_tmc + + +class FakeHMove: + def __init__(self, endstops): + self._endstops = endstops + + def get_mcu_endstops(self): + return self._endstops + + +# --------------------------------------------------------------------------- +# TMCVirtualPinHelper — round-trip invariant +# --------------------------------------------------------------------------- + + +def _snapshot(fields: FieldHelper, names): + return {n: fields.get_field(n) for n in names} + + +@pytest.mark.parametrize( + "driver_module,diag_pin_field,extra_fields", + [ + (tmc2130, "diag1_stall", []), + (tmc2240, "diag1_stall", ["en_pwm_mode"]), + ], + ids=["tmc2130", "tmc2240"], +) +def test_homing_begin_end_round_trip( + driver_module, diag_pin_field, extra_fields +): + """After begin+end, every touched field must return to its original value.""" + vph, fields, mcu_tmc = _make_vph(driver_module, diag_pin_field) + + # Configure a realistic starting state with NON-default values so we can + # actually see save/restore occur (default zeros would hide bugs). + fields.set_field("tpwmthrs", 0x12345) + fields.set_field("tcoolthrs", 0x6789A) + fields.set_field("thigh", 0x0ABCD) + fields.set_field("en_pwm_mode", 1) + fields.set_field(diag_pin_field, 0) + + touched = [ + "tpwmthrs", + "tcoolthrs", + "thigh", + "en_pwm_mode", + diag_pin_field, + ] + before = _snapshot(fields, touched) + + hmove = FakeHMove([vph.mcu_endstop]) + vph.handle_homing_move_begin(hmove) + vph.handle_homing_move_end(hmove) + + after = _snapshot(fields, touched) + assert after == before, ( + f"Round-trip mismatch: before={before} after={after}" + ) + + +@pytest.mark.parametrize( + "driver_module,diag_pin_field", + [(tmc2130, "diag1_stall"), (tmc2240, "diag1_stall")], + ids=["tmc2130", "tmc2240"], +) +def test_homing_begin_sets_homing_values(driver_module, diag_pin_field): + """During homing, the driver must be put into its stall-safe state.""" + vph, fields, mcu_tmc = _make_vph(driver_module, diag_pin_field) + + fields.set_field("tpwmthrs", 0x12345) + fields.set_field("tcoolthrs", 0) # unset, should be forced to 0xFFFFF + fields.set_field("thigh", 0x0ABCD) + fields.set_field("en_pwm_mode", 1) + fields.set_field(diag_pin_field, 0) + + hmove = FakeHMove([vph.mcu_endstop]) + vph.handle_homing_move_begin(hmove) + + # stealthchop must be disabled on drivers that have en_pwm_mode + assert fields.get_field("en_pwm_mode") == 0 + # diag pin must be routed to stall for this axis + assert fields.get_field(diag_pin_field) == 1 + # thigh must be zero during homing + assert fields.get_field("thigh") == 0 + # tcoolthrs must be forced to the max-sensitivity value since it was 0 + assert fields.get_field("tcoolthrs") == 0xFFFFF + + +def test_homing_preserves_nonzero_tcoolthrs_begin_state(): + """If tcoolthrs was already configured, homing should not override it.""" + vph, fields, mcu_tmc = _make_vph(tmc2130, "diag1_stall") + + fields.set_field("tcoolthrs", 0x555) + fields.set_field("thigh", 0) + fields.set_field("en_pwm_mode", 0) + fields.set_field("diag1_stall", 0) + + hmove = FakeHMove([vph.mcu_endstop]) + vph.handle_homing_move_begin(hmove) + + assert fields.get_field("tcoolthrs") == 0x555, ( + "pre-configured tcoolthrs should survive homing begin" + ) + + +def test_homing_skipped_when_endstop_not_in_move(): + """No register writes when the move doesn't include our endstop.""" + vph, fields, mcu_tmc = _make_vph(tmc2130, "diag1_stall") + + fields.set_field("tpwmthrs", 0x12345) + + hmove = FakeHMove([object()]) # different endstop + vph.handle_homing_move_begin(hmove) + vph.handle_homing_move_end(hmove) + + assert mcu_tmc.writes == [] + assert fields.get_field("tpwmthrs") == 0x12345 + + +# --------------------------------------------------------------------------- +# TMC2209 (stallguard4 / no en_pwm_mode) — round trip +# --------------------------------------------------------------------------- + + +def test_tmc2209_sg4_round_trip(): + """TMC2209 uses en_spreadcycle instead of en_pwm_mode. Must also round-trip.""" + # tmc2209 inherits most fields from tmc2208; ensure we have what we need + if "en_spreadcycle" not in { + f for reg in tmc2209.Fields.values() for f in reg + }: + pytest.skip("tmc2209 does not expose en_spreadcycle in test data") + + fields = _make_fields(tmc2209) + mcu_tmc = FakeMCUTMC(fields) + vph = object.__new__(TMCVirtualPinHelper) + vph.mcu_tmc = mcu_tmc + vph.fields = fields + vph.diag_pin = "PA1" + vph.diag_pin_field = None # SG4 drivers have no diag pin field + vph.mcu_endstop = object() + vph.en_pwm = False + vph.pwmthrs = 0 + vph.coolthrs = 0 + vph.thigh = 0 + vph._dirty_regs = collections.OrderedDict() + vph._prev_state = collections.OrderedDict() + + fields.set_field("tpwmthrs", 0x22222) + fields.set_field("tcoolthrs", 0x33333) + fields.set_field("en_spreadcycle", 1) # was spreadCycle, not stealth + + touched = ["tpwmthrs", "tcoolthrs", "en_spreadcycle"] + before = _snapshot(fields, touched) + + hmove = FakeHMove([vph.mcu_endstop]) + vph.handle_homing_move_begin(hmove) + + # During homing on SG4 drivers, stealthchop must be ON (spreadcycle OFF) + assert fields.get_field("en_spreadcycle") == 0 + assert fields.get_field("tpwmthrs") == 0 + + vph.handle_homing_move_end(hmove) + after = _snapshot(fields, touched) + assert after == before, ( + f"tmc2209 round-trip mismatch: before={before} after={after}" + ) + + +# --------------------------------------------------------------------------- +# BaseTMCCurrentHelper — Kalico home_current logic +# --------------------------------------------------------------------------- + + +class _NoopCurrentHelper(BaseTMCCurrentHelper): + """Bare helper built by hand; bypasses config parsing entirely.""" + + def __init__( + self, + run_current=0.8, + home_current=0.4, + hold_current=0.6, + max_current=2.0, + ): + # bypass BaseTMCCurrentHelper.__init__ (which reads from config) + self.printer = None + self.name = "stepper_x" + self.mcu_tmc = None + self.fields = None + self.sense_resistor = 0.11 + self.max_current = max_current + self.config_run_current = run_current + self.config_hold_current = hold_current + self.config_home_current = home_current + self.current_change_dwell_time = 0.5 + self.req_run_current = run_current + self.req_hold_current = hold_current + self.req_home_current = home_current + self.actual_current = run_current + self.set_current_calls: list[tuple] = [] + + def set_current(self, new_current, hold_current, print_time, force=False): + # record the call, then update actual_current exactly like the real + # set_current does via set_actual_current + self.set_current_calls.append( + (new_current, hold_current, print_time, force) + ) + self.actual_current = new_current + self.req_hold_current = hold_current + + def apply_current(self, print_time): + pass + + +def test_needs_home_current_change_true_when_mismatch(): + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.4) + assert ch.needs_home_current_change() is True + + +def test_needs_home_current_change_false_when_equal(): + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.8) + assert ch.needs_home_current_change() is False + + +def test_set_current_for_homing_pre_switches_to_home_current(): + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.4) + dwell = ch.set_current_for_homing(print_time=1.0, pre_homing=True) + + assert dwell == ch.current_change_dwell_time + assert ch.set_current_calls[-1][0] == pytest.approx(0.4) + assert ch.actual_current == pytest.approx(0.4) + + +def test_set_current_for_homing_post_restores_run_current(): + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.4) + # simulate pre-homing already happened + ch.set_current_for_homing(print_time=1.0, pre_homing=True) + assert ch.actual_current == pytest.approx(0.4) + + dwell = ch.set_current_for_homing(print_time=2.0, pre_homing=False) + + assert dwell == ch.current_change_dwell_time + assert ch.actual_current == pytest.approx(0.8) + + +def test_set_current_for_homing_noop_when_already_at_target(): + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.8) + dwell = ch.set_current_for_homing(print_time=1.0, pre_homing=True) + assert dwell == 0.0 + assert ch.set_current_calls == [] + + +def test_set_home_current_capped_at_max_current(): + ch = _NoopCurrentHelper(home_current=0.4, max_current=1.0) + ch.set_home_current(99.0) + assert ch.req_home_current == pytest.approx(1.0) + + +def test_set_home_current_below_max_passes_through(): + ch = _NoopCurrentHelper(home_current=0.4, max_current=2.0) + ch.set_home_current(0.7) + assert ch.req_home_current == pytest.approx(0.7) + + +def test_post_homing_path_runs_even_if_pre_did_nothing(): + """Regression: after a homing where run_current was already set, + we still need to consider returning to run_current on post.""" + ch = _NoopCurrentHelper(run_current=0.8, home_current=0.8) + # pre: no-op + dwell_pre = ch.set_current_for_homing(print_time=1.0, pre_homing=True) + # some external force moved actual_current off run (simulating the + # sensorless bugfix path where SET_TMC_CURRENT swapped things mid-flight) + ch.actual_current = 0.4 + dwell_post = ch.set_current_for_homing(print_time=2.0, pre_homing=False) + + assert dwell_pre == 0.0 + assert dwell_post == ch.current_change_dwell_time + assert ch.actual_current == pytest.approx(0.8)