diff --git a/src/fixate/drivers/dso/agilent_mso_x.py b/src/fixate/drivers/dso/agilent_mso_x.py index 7de7d04..ea53d09 100644 --- a/src/fixate/drivers/dso/agilent_mso_x.py +++ b/src/fixate/drivers/dso/agilent_mso_x.py @@ -8,9 +8,8 @@ # KEYSIGHT TECHNOLOGIES,DSOX1202G,CN60074190,02.10.2019111333 # KEYSIGHT TECHNOLOGIES,DSO-X 1102G,CN57096441,01.20.2019061038 # AGILENT TECHNOLOGIES,MSO-X 3014A,MY51360314,02.43.2018020635 - - class MSO_X_3000(DSO): + # Regex needs work to detect only the 4 channel scopes we have: REGEX_ID = "(KEYSIGHT|AGILENT) TECHNOLOGIES,[DM]SO-?X" INSTR_TYPE = "VISA" retrys_on_timeout = 1 @@ -22,523 +21,14 @@ def __init__(self, instrument): self._mode = "STOP" self._wave_acquired = False self._triggers_read = 0 + self.reset() self.instrument.query_delay = 0.2 - self._store = {} - self.api = [ - ("source1.ch1", self.store, {"source1": "CHAN1"}), - ("source1.ch2", self.store, {"source1": "CHAN2"}), - ("source1.ch3", self.store, {"source1": "CHAN3"}), - ("source1.ch4", self.store, {"source1": "CHAN4"}), - ("source1.function", self.store, {"source1": "FUNC"}), - ("source1.math", self.store, {"source1": "MATH"}), - ("source1.wmemory1", self.store, {"source1": "WMEM1"}), - ("source1.wmemory2", self.store, {"source1": "WMEM2"}), - ("source2.ch1", self.store, {"source2": "CHAN1"}), - ("source2.ch2", self.store, {"source2": "CHAN2"}), - ("source2.ch3", self.store, {"source2": "CHAN3"}), - ("source2.ch4", self.store, {"source2": "CHAN4"}), - ("source2.function", self.store, {"source1": "FUNC"}), - ("source2.math", self.store, {"source1": "MATH"}), - ("source2.wmemory1", self.store, {"source1": "WMEM1"}), - ("source2.wmemory2", self.store, {"source1": "WMEM2"}), - ("ch1._call", self.write, "CHAN1:DISP {value:d}"), - ("ch2._call", self.write, "CHAN2:DISP {value:d}"), - ("ch3._call", self.write, "CHAN3:DISP {value:d}"), - ("ch4._call", self.write, "CHAN4:DISP {value:d}"), - ("ch1.scale", self.write, "CHAN1:SCAL {value}"), - ("ch2.scale", self.write, "CHAN2:SCAL {value}"), - ("ch3.scale", self.write, "CHAN3:SCAL {value}"), - ("ch4.scale", self.write, "CHAN4:SCAL {value}"), - ("ch1.offset", self.write, "CHAN1:OFFS {value}"), - ("ch2.offset", self.write, "CHAN2:OFFS {value}"), - ("ch3.offset", self.write, "CHAN3:OFFS {value}"), - ("ch4.offset", self.write, "CHAN4:OFFS {value}"), - ("ch1.coupling.ac", self.write, "CHAN1:COUP AC"), - ("ch2.coupling.ac", self.write, "CHAN2:COUP AC"), - ("ch3.coupling.ac", self.write, "CHAN3:COUP AC"), - ("ch4.coupling.ac", self.write, "CHAN4:COUP AC"), - ("ch1.coupling.dc", self.write, "CHAN1:COUP DC"), - ("ch2.coupling.dc", self.write, "CHAN2:COUP DC"), - ("ch3.coupling.dc", self.write, "CHAN3:COUP DC"), - ("ch4.coupling.dc", self.write, "CHAN4:COUP DC"), - ("ch1.probe.attenuation", self.write, "CHAN1:PROB {value}"), - ("ch2.probe.attenuation", self.write, "CHAN2:PROB {value}"), - ("ch3.probe.attenuation", self.write, "CHAN3:PROB {value}"), - ("ch4.probe.attenuation", self.write, "CHAN4:PROB {value}"), - ("time_base.scale", self.write, "TIM:SCAL {value}"), - ("time_base.position", self.write, "TIM:POS {value}"), - ("trigger.mode.edge._call", self.write, "TRIG:MODE EDGE"), - ("trigger.mode.edge.level", self.write, "TRIG:EDGE:LEVEL {value}"), - ("trigger.mode.edge.source.ch1", self.write, "TRIG:EDGE:SOUR CHAN1"), - ("trigger.mode.edge.source.ch2", self.write, "TRIG:EDGE:SOUR CHAN2"), - ("trigger.mode.edge.source.ch3", self.write, "TRIG:EDGE:SOUR CHAN3"), - ("trigger.mode.edge.source.ch4", self.write, "TRIG:EDGE:SOUR CHAN4"), - ("trigger.mode.edge.slope.rising", self.write, "TRIG:EDGE:SLOPE POS"), - ("trigger.mode.edge.slope.falling", self.write, "TRIG:EDGE:SLOPE NEG"), - ("trigger.mode.edge.slope.either", self.write, "TRIG:EDGE:SLOPE EITH"), - ("trigger.mode.edge.slope.alternating", self.write, "TRIG:EDGE:SLOPE ALT"), - ("trigger.sweep.normal", self.write, "TRIG:SWE NORM"), - ("trigger.sweep.auto", self.write, "TRIG:SWE AUTO"), - ("trigger.coupling.ac", self.write, "TRIG:COUP AC"), - ("trigger.coupling.dc", self.write, "TRIG:COUP DC"), - ("trigger.coupling.lf_reject", self.write, "TRIG:COUP LFR"), - ("trigger.hf_reject", self.write, "TRIG:HFR {value}"), - ("acquire.normal", self.write, "ACQ:TYPE NORM"), - ("acquire.peak_detect", self.write, "ACQ:TYPE PEAK"), - ("acquire.averaging", self.write, "ACQ:TYPE AVER;:ACQ:COUN {value}"), - ("acquire.high_resolution", self.write, "ACQ:TYPE HRES"), - ("events.trigger", self.query_bool, ":TER?"), - # Measure - ( - "measure.delay._call", - self.query_after_acquire, - "MEAS:DEL? {self._store[source1]},{self._store[source2]}", - ), - ( - "measure.define.threshold.percent", - self.write, - "MEAS:DEF THR,PERC,{upper},{middle},{lower}", - ), - ( - "measure.define.threshold.absolute", - self.write, - "MEAS:DEF THR,ABS,{upper},{middle},{lower}", - ), - ("measure.delay.edges.rising.rising", self.write, "MEAS:DEF DEL, +1, +1"), - ("measure.delay.edges.rising.falling", self.write, "MEAS:DEF DEL, +1, -1"), - ("measure.delay.edges.falling.rising", self.write, "MEAS:DEF DEL, -1, +1"), - ("measure.delay.edges.falling.falling", self.write, "MEAS:DEF DEL, -1, -1"), - ( - "measure.phase", - self.query_after_acquire, - "MEAS:PHAS? {self._store[source1]},{self._store[source2]}", - ), - ( - "measure.vratio.cycle", - self.query_after_acquire, - "MEAS:VRAT? CYCL,{self._store[source1]},{self._store[source2]}", - ), - ( - "measure.vratio.display", - self.query_after_acquire, - "MEAS:VRAT? DISP,{self._store[source1]},{self._store[source2]}", - ), - # Ch1 Measure - ("measure.counter.ch1", self.query_after_acquire, "MEAS:COUN? CHAN1"), - ("measure.duty.ch1", self.query_after_acquire, "MEAS:DUTY? CHAN1"), - ("measure.fall_time.ch1", self.query_after_acquire, "MEAS:FALL? CHAN1"), - ("measure.rise_time.ch1", self.query_after_acquire, "MEAS:RIS? CHAN1"), - ("measure.frequency.ch1", self.query_after_acquire, "MEAS:FREQ? CHAN1"), - ( - "measure.cnt_edge_rising.ch1", - self.query_after_acquire, - "MEAS:NEDG? CHAN1", - ), - ( - "measure.cnt_edge_falling.ch1", - self.query_after_acquire, - "MEAS:PEDG? CHAN1", - ), - ( - "measure.cnt_pulse_positive.ch1", - self.query_after_acquire, - "MEAS:NPUL? CHAN1", - ), - ( - "measure.cnt_pulse_negative.ch1", - self.query_after_acquire, - "MEAS:PPUL? CHAN1", - ), - ("measure.period.ch1", self.query_after_acquire, "MEAS:PER? CHAN1"), - ("measure.pulse_width.ch1", self.query_after_acquire, "MEAS:PWID? CHAN1"), - ("measure.vamplitude.ch1", self.query_after_acquire, "MEAS:VAMP? CHAN1"), - ( - "measure.vaverage.cycle.ch1", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN1", - ), - ( - "measure.vaverage.display.ch1", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN1", - ), - ("measure.vbase.ch1", self.query_after_acquire, "MEAS:VBAS? CHAN1"), - ("measure.vtop.ch1", self.query_after_acquire, "MEAS:VTOP? CHAN1"), - ("measure.vmax.ch1", self.query_after_acquire, "MEAS:VMAX? CHAN1"), - ("measure.vmin.ch1", self.query_after_acquire, "MEAS:VMIN? CHAN1"), - ("measure.vpp.ch1", self.query_after_acquire, "MEAS:VPP? CHAN1"), - ( - "measure.vrms.dc.cycle.ch1", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN1", - ), - ( - "measure.vrms.dc.display.ch1", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN1", - ), - ( - "measure.vrms.ac.cycle.ch1", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN1", - ), - ( - "measure.vrms.ac.display.ch1", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN1", - ), - ("measure.xmax.ch1", self.query_after_acquire, "MEAS:XMAX? CHAN1"), - ("measure.xmin.ch1", self.query_after_acquire, "MEAS:XMIN? CHAN1"), - # Ch2 Measure - ("measure.counter.ch2", self.query_after_acquire, "MEAS:COUN? CHAN2"), - ("measure.duty.ch2", self.query_after_acquire, "MEAS:DUTY? CHAN2"), - ("measure.rise_time.ch2", self.query_after_acquire, "MEAS:RIS? CHAN2"), - ("measure.fall_time.ch2", self.query_after_acquire, "MEAS:FALL? CHAN2"), - ("measure.frequency.ch2", self.query_after_acquire, "MEAS:FREQ? CHAN2"), - ( - "measure.cnt_edge_rising.ch2", - self.query_after_acquire, - "MEAS:NEDG? CHAN2", - ), - ( - "measure.cnt_edge_falling.ch2", - self.query_after_acquire, - "MEAS:PEDG? CHAN2", - ), - ( - "measure.cnt_pulse_positive.ch2", - self.query_after_acquire, - "MEAS:NPUL? CHAN2", - ), - ( - "measure.cnt_pulse_negative.ch2", - self.query_after_acquire, - "MEAS:PPUL? CHAN2", - ), - ("measure.period.ch2", self.query_after_acquire, "MEAS:PER? CHAN2"), - ("measure.pulse_width.ch2", self.query_after_acquire, "MEAS:PWID? CHAN2"), - ("measure.vamplitude.ch2", self.query_after_acquire, "MEAS:VAMP? CHAN2"), - ( - "measure.vaverage.cycle.ch2", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN2", - ), - ( - "measure.vaverage.display.ch2", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN2", - ), - ("measure.vbase.ch2", self.query_after_acquire, "MEAS:VBAS? CHAN2"), - ("measure.vtop.ch2", self.query_after_acquire, "MEAS:VTOP? CHAN2"), - ("measure.vmax.ch2", self.query_after_acquire, "MEAS:VMAX? CHAN2"), - ("measure.vmin.ch2", self.query_after_acquire, "MEAS:VMIN? CHAN2"), - ("measure.vpp.ch2", self.query_after_acquire, "MEAS:VPP? CHAN2"), - ( - "measure.vrms.dc.cycle.ch2", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN2", - ), - ( - "measure.vrms.dc.display.ch2", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN2", - ), - ( - "measure.vrms.ac.cycle.ch2", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN2", - ), - ( - "measure.vrms.ac.display.ch2", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN2", - ), - ("measure.xmax.ch2", self.query_after_acquire, "MEAS:XMAX? CHAN2"), - ("measure.xmin.ch2", self.query_after_acquire, "MEAS:XMIN? CHAN2"), - # Ch3 Measure - ("measure.counter.ch3", self.query_after_acquire, "MEAS:COUN? CHAN3"), - ("measure.duty.ch3", self.query_after_acquire, "MEAS:DUTY? CHAN3"), - ("measure.fall_time.ch3", self.query_after_acquire, "MEAS:FALL? CHAN3"), - ("measure.rise_time.ch3", self.query_after_acquire, "MEAS:RIS? CHAN3"), - ("measure.frequency.ch3", self.query_after_acquire, "MEAS:FREQ? CHAN3"), - ( - "measure.cnt_edge_rising.ch3", - self.query_after_acquire, - "MEAS:NEDG? CHAN3", - ), - ( - "measure.cnt_edge_falling.ch3", - self.query_after_acquire, - "MEAS:PEDG? CHAN3", - ), - ( - "measure.cnt_pulse_positive.ch3", - self.query_after_acquire, - "MEAS:NPUL? CHAN3", - ), - ( - "measure.cnt_pulse_negative.ch3", - self.query_after_acquire, - "MEAS:PPUL? CHAN3", - ), - ("measure.period.ch3", self.query_after_acquire, "MEAS:PER? CHAN3"), - ("measure.pulse_width.ch3", self.query_after_acquire, "MEAS:PWID? CHAN3"), - ("measure.vamplitude.ch3", self.query_after_acquire, "MEAS:VAMP? CHAN3"), - ( - "measure.vaverage.cycle.ch3", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN3", - ), - ( - "measure.vaverage.display.ch3", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN3", - ), - ("measure.vbase.ch3", self.query_after_acquire, "MEAS:VBAS? CHAN3"), - ("measure.vtop.ch3", self.query_after_acquire, "MEAS:VTOP? CHAN3"), - ("measure.vmax.ch3", self.query_after_acquire, "MEAS:VMAX? CHAN3"), - ("measure.vmin.ch3", self.query_after_acquire, "MEAS:VMIN? CHAN3"), - ("measure.vpp.ch3", self.query_after_acquire, "MEAS:VPP? CHAN3"), - ( - "measure.vrms.dc.cycle.ch3", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN3", - ), - ( - "measure.vrms.dc.display.ch3", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN3", - ), - ( - "measure.vrms.ac.cycle.ch3", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN3", - ), - ( - "measure.vrms.ac.display.ch3", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN3", - ), - ("measure.xmax.ch3", self.query_after_acquire, "MEAS:XMAX? CHAN3"), - ("measure.xmin.ch3", self.query_after_acquire, "MEAS:XMIN? CHAN3"), - # Ch4 Measure - ("measure.counter.ch4", self.query_after_acquire, "MEAS:COUN? CHAN4"), - ("measure.duty.ch4", self.query_after_acquire, "MEAS:DUTY? CHAN4"), - ("measure.fall_time.ch4", self.query_after_acquire, "MEAS:FALL? CHAN4"), - ("measure.rise_time.ch4", self.query_after_acquire, "MEAS:RIS? CHAN4"), - ("measure.frequency.ch4", self.query_after_acquire, "MEAS:FREQ? CHAN4"), - ( - "measure.cnt_edge_rising.ch4", - self.query_after_acquire, - "MEAS:NEDG? CHAN4", - ), - ( - "measure.cnt_edge_falling.ch4", - self.query_after_acquire, - "MEAS:PEDG? CHAN4", - ), - ( - "measure.cnt_pulse_positive.ch4", - self.query_after_acquire, - "MEAS:NPUL? CHAN4", - ), - ( - "measure.cnt_pulse_negative.ch4", - self.query_after_acquire, - "MEAS:PPUL? CHAN4", - ), - ("measure.period.ch4", self.query_after_acquire, "MEAS:PER? CHAN4"), - ("measure.pulse_width.ch4", self.query_after_acquire, "MEAS:PWID? CHAN4"), - ("measure.vamplitude.ch4", self.query_after_acquire, "MEAS:VAMP? CHAN4"), - ( - "measure.vaverage.cycle.ch4", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN4", - ), - ( - "measure.vaverage.display.ch4", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN4", - ), - ("measure.vbase.ch4", self.query_after_acquire, "MEAS:VBAS? CHAN4"), - ("measure.vtop.ch4", self.query_after_acquire, "MEAS:VTOP? CHAN4"), - ("measure.vmax.ch4", self.query_after_acquire, "MEAS:VMAX? CHAN4"), - ("measure.vmin.ch4", self.query_after_acquire, "MEAS:VMIN? CHAN4"), - ("measure.vpp.ch4", self.query_after_acquire, "MEAS:VPP? CHAN4"), - ( - "measure.vrms.dc.cycle.ch4", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN4", - ), - ( - "measure.vrms.dc.display.ch4", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN4", - ), - ( - "measure.vrms.ac.cycle.ch4", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN4", - ), - ( - "measure.vrms.ac.display.ch4", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN4", - ), - ("measure.xmax.ch4", self.query_after_acquire, "MEAS:XMAX? CHAN4"), - ("measure.xmin.ch4", self.query_after_acquire, "MEAS:XMIN? CHAN4"), - # Function Measure - ("measure.duty.function", self.query_after_acquire, "MEAS:DUTY? FUNC"), - ("measure.fall_time.function", self.query_after_acquire, "MEAS:FALL? FUNC"), - ("measure.rise_time.function", self.query_after_acquire, "MEAS:RIS? FUNC"), - ("measure.frequency.function", self.query_after_acquire, "MEAS:FREQ? FUNC"), - ( - "measure.cnt_edge_rising.function", - self.query_after_acquire, - "MEAS:NEDG? FUNC", - ), - ( - "measure.cnt_edge_falling.function", - self.query_after_acquire, - "MEAS:PEDG? FUNC", - ), - ( - "measure.cnt_pulse_positive.function", - self.query_after_acquire, - "MEAS:NPUL? FUNC", - ), - ( - "measure.cnt_pulse_negative.function", - self.query_after_acquire, - "MEAS:PPUL? FUNC", - ), - ("measure.period.function", self.query_after_acquire, "MEAS:PER? FUNC"), - ( - "measure.pulse_width.function", - self.query_after_acquire, - "MEAS:PWID? FUNC", - ), - ( - "measure.vamplitude.function", - self.query_after_acquire, - "MEAS:VAMP? FUNC", - ), - ( - "measure.vaverage.cycle.function", - self.query_after_acquire, - "MEAS:VAV? CYCL,FUNC", - ), - ( - "measure.vaverage.display.function", - self.query_after_acquire, - "MEAS:VAV? DISP,FUNC", - ), - ("measure.vbase.function", self.query_after_acquire, "MEAS:VBAS? FUNC"), - ("measure.vtop.function", self.query_after_acquire, "MEAS:VTOP? FUNC"), - ("measure.vmax.function", self.query_after_acquire, "MEAS:VMAX? FUNC"), - ("measure.vmin.function", self.query_after_acquire, "MEAS:VMIN? FUNC"), - ("measure.vpp.function", self.query_after_acquire, "MEAS:VPP? FUNC"), - ( - "measure.vrms.dc.cycle.function", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,FUNC", - ), - ( - "measure.vrms.dc.display.function", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,FUNC", - ), - ( - "measure.vrms.ac.cycle.function", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,FUNC", - ), - ( - "measure.vrms.ac.display.function", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,FUNC", - ), - ("measure.xmax.function", self.query_after_acquire, "MEAS:XMAX? FUNC"), - ("measure.xmin.function", self.query_after_acquire, "MEAS:XMIN? FUNC"), - # MATH,Measure - ("measure.duty.math", self.query_after_acquire, "MEAS:DUTY? MATH"), - ("measure.fall_time.math", self.query_after_acquire, "MEAS:FALL? MATH"), - ("measure.rise_time.math", self.query_after_acquire, "MEAS:RIS? MATH"), - ("measure.frequency.math", self.query_after_acquire, "MEAS:FREQ? MATH"), - ( - "measure.cnt_edge_rising.math", - self.query_after_acquire, - "MEAS:NEDG? MATH", - ), - ( - "measure.cnt_edge_falling.math", - self.query_after_acquire, - "MEAS:PEDG? MATH", - ), - ( - "measure.cnt_pulse_positive.math", - self.query_after_acquire, - "MEAS:NPUL? MATH", - ), - ( - "measure.cnt_pulse_negative.math", - self.query_after_acquire, - "MEAS:PPUL? MATH", - ), - ("measure.period.math", self.query_after_acquire, "MEAS:PER? MATH"), - ("measure.pulse_width.math", self.query_after_acquire, "MEAS:PWID? MATH"), - ("measure.vamplitude.math", self.query_after_acquire, "MEAS:VAMP? MATH"), - ( - "measure.vaverage.cycle.math", - self.query_after_acquire, - "MEAS:VAV? CYCL,MATH", - ), - ( - "measure.vaverage.display.math", - self.query_after_acquire, - "MEAS:VAV? DISP,MATH", - ), - ("measure.vbase.math", self.query_after_acquire, "MEAS:VBAS? MATH"), - ("measure.vtop.math", self.query_after_acquire, "MEAS:VTOP? MATH"), - ("measure.vmax.math", self.query_after_acquire, "MEAS:VMAX? MATH"), - ("measure.vmin.math", self.query_after_acquire, "MEAS:VMIN? MATH"), - ("measure.vpp.math", self.query_after_acquire, "MEAS:VPP? MATH"), - ( - "measure.vrms.dc.cycle.math", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,MATH", - ), - ( - "measure.vrms.dc.display.math", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,MATH", - ), - ( - "measure.vrms.ac.cycle.math", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,MATH", - ), - ( - "measure.vrms.ac.display.math", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,MATH", - ), - ("measure.xmax.math", self.query_after_acquire, "MEAS:XMAX? MATH"), - ("measure.xmin.math", self.query_after_acquire, "MEAS:XMIN? MATH"), - ] - self.init_api() self.instrument.timeout = 1000 - def single(self): + def single(self) -> None: """ - Sets up the oscilliscope for a single shot triggered measurement - Does multiple steps to ensure that at the end of this call that the oscilloscope is primed for a trigger - 1. Sets mode to stop to ensure that only one measurement will occur - 2. Clears the status registers - 3. Sets the mode to single - 4. Monitors the registers until the trigger is armed - :return: + Specific implementation of the single trigger setup for Agilent MSO-X """ self._triggers_read = 0 self._raise_if_error() # Raises if any errors were made during setup @@ -582,9 +72,6 @@ def stop(self): self._mode = "STOP" self._wave_acquired = False - def _write(self, value): - self.instrument.write(value) - def acquire(self, acquire_type="normal", averaging_samples=0): """ :param channel @@ -612,7 +99,7 @@ def acquire(self, acquire_type="normal", averaging_samples=0): raise ValueError("Invalid acquire type {}".format(acquire_type)) def waveform_preamble(self): - values = self.query_ascii_values(":WAV:PRE?") + values = self.instrument.query_ascii_values(":WAV:PRE?") wav_form_dict = {"0": "BYTE", "1": "WORD", "4": "ASCii"} acq_type_dict = { "0": "NORMAL", @@ -702,7 +189,7 @@ def waveform_values(self, signal, file_name="", file_type="csv"): self.instrument.write(":WAVeform:POINts:MODE " + points_mode) # Check if there is actually data to acquire: - data_available = int(self.query(":WAVeform:POINTs?")) + data_available = int(self.instrument.query(":WAVeform:POINTs?")) if data_available == 0: # No data is available # Setting a channel to be a waveform source turns it on, so we need to turn it off now: @@ -763,27 +250,6 @@ def waveform_values(self, signal, file_name="", file_type="csv"): raise NotImplementedError("Binary Output not implemented") return x, y - def digitize(self, signals): - signals = [self.validate_signal(sig) for sig in signals] - self.write(":DIG {}".format(",".join(signals))) - return signals - - def validate_signal(self, signal): - """ - :param signal: String ie. "1", "2", "3", "4", "func", "math" - :return: - """ - try: - if not (1 <= int(signal) <= 4): - raise ValueError("Invalid source channel {}".format(signal)) - else: - signal = "CHAN{}".format(int(signal)) - except ValueError: - if signal.lower() not in ["func", "math"]: - raise ValueError("Invalid source channel {}".format(signal)) - signal = signal.lower() - return signal - def reset(self): self.instrument.write("*CLS;*RST;:STOP") time.sleep(0.15) @@ -796,7 +262,7 @@ def save_setup(self, file_name): self.instrument.timeout = 5000 try: with open(file_name, "w") as f: - setup = self.query(":SYSTem:SETup?") + setup = self.instrument.query(":SYSTem:SETup?") f.write(setup) finally: self.instrument.timeout = 1000 @@ -810,43 +276,6 @@ def load_setup(self, file_name): finally: self.instrument.timeout = 1000 - def query(self, value): - try: - response = self.instrument.query(value) - finally: - self._raise_if_error() - return response - - def query_bool(self, value): - return bool(self.query_ascii_value(value)) - - def query_binary_values(self, value): - response = self.instrument.query_binary_values(value) - self._raise_if_error() - return response - - def query_ascii_values(self, value): - response = self.instrument.query_ascii_values(value) - self._raise_if_error() - return response - - def query_ascii_value(self, value): - return self.query_ascii_values(value)[0] - - def query_value(self, base_str, *args, **kwargs): - formatted_string = self._format_string(base_str, **kwargs) - return self.query_ascii_value(formatted_string) - - def query_after_acquire(self, base_str, *args, **kwargs): - self.wait_for_acquire() - try: - formatted_string = self._format_string(base_str, **kwargs) - return self.instrument.query_ascii_values(formatted_string)[0] - except: - self.instrument.close() - self.instrument.open() - raise - def wait_for_trigger(self, timeout): """ Waits for trigger for a set amount of time. @@ -912,12 +341,7 @@ def wait_for_acquire(self): "Cannot acquire waveform in this mode: {}".format(self._mode) ) - def read_raw(self): - data = self.instrument.read_raw() - self._raise_if_error() - return data - - def _check_errors(self): + def _check_errors(self) -> tuple[int, str]: time.sleep(0.1) resp = self.instrument.query("SYST:ERR?") code, msg = resp.strip("\n").split(",") @@ -925,29 +349,6 @@ def _check_errors(self): msg = msg.strip('"') return code, msg - def _raise_if_error(self): - errors = [] - while True: - code, msg = self._check_errors() - if code != 0: - errors.append((code, msg)) - else: - break - if errors: - raise InstrumentError( - "Error(s) Returned from DSO\n" - + "\n".join( - ["Code: {}\nMessage:{}".format(code, msg) for code, msg in errors] - ) - ) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.instrument.close() - self.is_connected = False - def write(self, base_str, *args, **kwargs): formatted_string = self._format_string(base_str, **kwargs) self._write(formatted_string) @@ -962,27 +363,6 @@ def _format_string(self, base_str, **kwargs): prev_string = cur_string return cur_string - def store(self, store_dict, *args, **kwargs): - """ - Store a dictionary of values in TestClass - :param kwargs: - Dictionary containing the parameters to store - :return: - """ - new_dict = store_dict.copy() - for k, v in store_dict.items(): - # I want the same function from write to set up the string before putting it in new_dict - try: - new_dict[k] = v.format(**kwargs) - except: - pass - self._store.update(new_dict) - - def store_and_write(self, params, *args, **kwargs): - base_str, store_dict = params - self.store(store_dict) - self.write(base_str, *args, **kwargs) - def get_identity(self) -> str: """ :return: AGILENT TECHNOLOGIES,,,X.XX.XX diff --git a/src/fixate/drivers/dso/helper.py b/src/fixate/drivers/dso/helper.py index bda2fb7..c0a5f71 100644 --- a/src/fixate/drivers/dso/helper.py +++ b/src/fixate/drivers/dso/helper.py @@ -1,829 +1,517 @@ import inspect from abc import ABCMeta, abstractmethod -from functools import update_wrapper -from fixate.core.exceptions import InstrumentFeatureUnavailable +from typing import Callable, Literal, Union -import typing +from fixate.core.exceptions import InstrumentFeatureUnavailable, InstrumentError -number = typing.Union[float, int] +number = Union[float, int] +""" +Callbacks +Write and query callbacks get passed down the tree of class variables. -class CallableNoArgs: - def __call__(self): - return self._call() +They are then used to facilitate a single point of communication between +the scope and the PC. This has the benefit of being able to mock the interface. +ie we can create a mock DSO that inherts the base and redefines functions. +""" +WriteCallback = Callable[[str], None] +QueryAsciiValuesCallback = Callable[[str], float] - def _call(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +# Prompt that a featrue is not available: +def unavailable(name: str | None = None): + label = name or inspect.stack()[1].function + return InstrumentFeatureUnavailable(f"{label} not available on this device") + + +class Channel: + """ + DSO Channel implementation + """ + + def __init__( + self, channel_name: str, write: WriteCallback, enabled: bool = True + ) -> None: + # Set enabled to false to note a device does not have this channel + self.enabled = enabled + self._ch_name = channel_name + self._write = write + self._ch_cmd = f"CHAN{self._ch_name}" + + self.coupling = Coupling(write, self._ch_cmd) + self.probe = Probe(write, self._ch_cmd) + def _check_en(self): + if not self.enabled: + raise unavailable(f"{self._ch_cmd} is not available on this device") -class CallableBool: def __call__(self, value: bool): - return self._call(value) + self._check_en() + self._write(f"CHAN1:DISP {int(value)}") - def _call(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def scale(self, value: number) -> None: + self._check_en() + self._write(f"{self._ch_cmd}:SCAL {value}") + def offset(self, value: number) -> None: + self._check_en() + self._write(f"{self._ch_cmd}:OFFS {value}") -class SourcesCh: - def ch1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def invert(self, value: bool) -> None: + self._check_en() + self._write(f"{self._ch_cmd}:INV {int(value)}") - def ch2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def ch3(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Coupling: + """ + Defines the coupling interface + """ - def ch4(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def __init__(self, write: WriteCallback, cmd_prefix: str): + self._write = write + self._cmd_prefix = cmd_prefix + def ac(self) -> None: + # AC Coupling + self._write(f"{self._cmd_prefix}:COUP AC") -class SourcesSpecial: - def function(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def dc(self) -> None: + # DC Coupling + self._write(f"{self._cmd_prefix}:COUP DC") - def math(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Probe: + def __init__(self, write: WriteCallback, cmd_prefix: str): + self._write = write + self._cmd_prefix = cmd_prefix -class SourcesWMem: - def wmemory1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def attenuation(self, attenuation: number) -> None: + self._write(f"{self._cmd_prefix}:PROB {attenuation}") - def wmemory2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TimeBase: + def __init__(self, write: WriteCallback): + self._write = write -class SourcesExt: - def external(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def scale(self, value: number) -> None: + self._write(f"TIM:SCAL {value}") + def position(self, value: number) -> None: + self._write(f"TIM:POS {value}") -class SourcesDig: - def d0(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def d1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TriggerCoupling(Coupling): + """ + Extends the Coupling interface with extra functions that the trigger has + """ - def d2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def lf_reject(self) -> None: + self._write(f"{self._cmd_prefix}:COUP LFR") - def d3(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def d4(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TriggerSweep: + def __init__(self, write: WriteCallback) -> None: + self._write = write - def d5(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def auto(self) -> None: + self._write("TRIG:SWE AUTO") - def d6(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def normal(self) -> None: + self._write("TRIG:SWE NORM") - def d7(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def d8(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TriggerMode: + """ + Define the trigger mode interface + """ - def d9(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def __init__(self, write: WriteCallback): + self.edge = TriggerEdge(write) - def d10(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def d11(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TriggerSlopes: + def __init__(self, write: WriteCallback) -> None: + self._write = write - def d12(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def rising(self) -> None: + self._write("TRIG:EDGE:SLOPE POS") - def d13(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def falling(self) -> None: + self._write("TRIG:EDGE:SLOPE NEG") - def d14(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def either(self) -> None: + self._write("TRIG:EDGE:SLOPE EITH") - def d15(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def alternating(self) -> None: + self._write("TRIG:EDGE:SLOPE ALT") -class MeasureAllSources( - SourcesCh, SourcesSpecial, SourcesWMem, SourcesDig, CallableNoArgs -): - pass - - -class TrigSources(SourcesCh, SourcesExt, SourcesDig): - pass - - -class MultiMeasureSources(MeasureAllSources): - def __init__(self): - self.ch1 = MeasureAllSources() - self.ch1 = MeasureAllSources() - self.ch2 = MeasureAllSources() - self.ch3 = MeasureAllSources() - self.ch4 = MeasureAllSources() - self.function = MeasureAllSources() - self.math = MeasureAllSources() - self.wmemory1 = MeasureAllSources() - self.wmemory2 = MeasureAllSources() - self.external = MeasureAllSources() - self.d0 = MeasureAllSources() - self.d1 = MeasureAllSources() - self.d2 = MeasureAllSources() - self.d3 = MeasureAllSources() - self.d4 = MeasureAllSources() - self.d5 = MeasureAllSources() - self.d6 = MeasureAllSources() - self.d7 = MeasureAllSources() - self.d8 = MeasureAllSources() - self.d9 = MeasureAllSources() - self.d10 = MeasureAllSources() - self.d11 = MeasureAllSources() - self.d12 = MeasureAllSources() - self.d13 = MeasureAllSources() - self.d14 = MeasureAllSources() - self.d15 = MeasureAllSources() +class TriggerSources: + def __init__(self, write: WriteCallback) -> None: + self._write = write + def ch1(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN1") -class Coupling: - def ac(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch2(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN2") - def dc(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch3(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN3") - def lf_reject(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch4(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN4") - def tv(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class TriggerEdge: + def __init__(self, write: WriteCallback) -> None: + self._write = write + self.source = TriggerSources(write) + self.slope = TriggerSlopes(write) -class Probe: - def attenuation(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def __call__(self) -> None: + self._write("TRIG:MODE EDGE") + def level(self, value: number) -> None: + self._write(f"TRIG:EDGE:LEVEL {value}") -class VerticalUnits: - def volts(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def amps(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Trigger: + """ + Trigger base class + """ + def __init__(self, write: WriteCallback): + self._write = write -class ChannelBase(CallableBool): - def __init__(self, channel_name: str): - self._ch_name = channel_name - # self.waveform = Waveform() - # self.modulate = Modulate() - # self.burst = Burst() - # self.load = Load() - self.coupling = Coupling() - self.probe = Probe() - self.units = VerticalUnits() - - def bandwidth(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + self.mode = TriggerMode(write) + self.sweep = TriggerSweep(write) + self.coupling = TriggerCoupling(write, "TRIG") - def bandwidth_limit(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def hf_reject(self, value: bool) -> None: + self._write(f"TRIG:HFR {int(value)}") - def impedance(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def invert(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Acquire: + def __init__(self, write: WriteCallback): + self._write = write - def offset(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def normal(self) -> None: + self._write(f"ACQ:TYPE NORM") - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def peak_detect(self) -> None: + self._write(f"ACQ:TYPE PEAK") + def averaging(self, value) -> None: + self._write(f"ACQ:TYPE AVER;:ACQ:COUN {value}") -class Trigger: - def __init__(self): - self.mode = TrigMode() - self.delay = None - self.eburst = None - self.coupling = Coupling() - self.sweep = TrigSweep() - - def force(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def high_resolution(self) -> None: + self._write(f"ACQ:TYPE HRES") - def hf_reject(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def hold_off(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +# Yes this sucks... +class MeasurementBase: + def __init__(self, query: QueryAsciiValuesCallback, command: str): + self._query = query + self._command = command - def n_reject(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch1(self): + return self._query(self._command + " CHAN1") + def ch2(self): + return self._query(self._command + " CHAN2") -class TrigSweep: - def auto(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch3(self): + return self._query(self._command + " CHAN3") - def normal(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch4(self): + return self._query(self._command + " CHAN4") + def function(self): + return self._query(self._command + " FUNC") -class TrigLevel: - def high(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def math(self): + return self._query(self._command + " MATH") - def low(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class VAverageMeasurement: + def __init__(self, query: QueryAsciiValuesCallback) -> None: + self.cycle = MeasurementBase(query, "MEAS:VAV? CYCL,") + self.display = MeasurementBase(query, "MEAS:VAV? DISP,") -class TrigMode: - def __init__(self): - self.edge = TrigEdge() +class VRmsModesMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, mode: str) -> None: + self.cycle = MeasurementBase(query, f"MEAS:VRMS? CYCL,{mode},") + self.display = MeasurementBase(query, f"MEAS:VRMS? DISP,{mode},") -class TrigEdge(CallableNoArgs): - def __init__(self): - self.source = TrigSources() - self.slope = Slopes() - def level(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class VRmsMeasurement: + def __init__(self, query: QueryAsciiValuesCallback) -> None: + self.dc = VRmsModesMeasurement(query, "DC") + self.ac = VRmsModesMeasurement(query, "AC") -class TrigReject: - def off(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class SourceSelector: + """ + Retain compatibility of source selection from the old API + dso.source1.ch1() for example will set the source1.source = "CHAN1" + This is then queried by the two source measurement functions like 'delay' + You must set the sources before calling a multi measurement function. + """ - def lf(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def __init__(self, store: dict, source: Literal["source1"] | Literal["source2"]): + self._store = store + # Name of the source: + self.source = source - def hf(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch1(self) -> None: + self._store[self.source] = "CHAN1" + def ch2(self) -> None: + self._store[self.source] = "CHAN2" -class Slopes: - def rising(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch3(self) -> None: + self._store[self.source] = "CHAN3" - def falling(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def ch4(self) -> None: + self._store[self.source] = "CHAN4" - def alternating(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def function(self) -> None: + self._store[self.source] = "FUNC" - def either(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def math(self) -> None: + self._store[self.source] = "MATH" + def wmemory1(self) -> None: + self._store[self.source] = "WMEM1" -class Acquire: - def __init__(self): - self.mode = AcquireMode() + def wmemory2(self) -> None: + self._store[self.source] = "WMEM2" - def normal(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def peak_detect(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class DefineThreshold: + def __init__(self, write: WriteCallback) -> None: + self._write = write - def averaging(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def percent(self, upper, middle, lower) -> None: + self._write(f"MEAS:DEF THR,PERC,{upper},{middle},{lower}") - def high_resolution(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def absolute(self, upper, middle, lower) -> None: + self._write(f"MEAS:DEF THR,ABS,{upper},{middle},{lower}") -class AcquireMode: - def rtim(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Define: + def __init__(self, write: WriteCallback) -> None: + self.threshold = DefineThreshold(write) - def segm(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class DelayEdges: + def __init__(self, write: WriteCallback) -> None: + self._write = write -class Timebase: - def __init__(self): - self.mode = TimebaseMode() + def rising_rising(self) -> None: + self._write("MEAS:DEF DEL, +1, +1") - def position(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def rising_falling(self) -> None: + self._write("MEAS:DEF DEL, +1, -1") - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def falling_rising(self) -> None: + self._write("MEAS:DEF DEL, -1, +1") + def falling_falling(self) -> None: + self._write("MEAS:DEF DEL, -1, -1") -class TimebaseMode: - def main(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - def window(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class DelayMeasurement: + def __init__( + self, write: WriteCallback, query: QueryAsciiValuesCallback, store: dict + ) -> None: + self._write = write + self._query = query + self._store = store + self.edges = DelayEdges(write) - def xy(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) + def __call__(self) -> float: + return self._query( + f"MEAS:DEL? {self._store['source1']},{self._store['source2']}" ) - def roll(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class VRatioMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, store: dict) -> None: + self._query = query + self._store = store -class Events: - def trigger(self): - """ - Indicates if a trigger event has occurred. - Calls to this will clear the existing trigger events - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) + def cycle(self) -> float: + return self._query( + f"MEAS:VRAT? CYCL,{self._store['source1']},{self._store['source2']}" ) + def display(self) -> float: + return self._query( + f"MEAS:VRAT? DISP,{self._store['source1']},{self._store['source2']}" + ) -class MeasureInterval: - def __init__(self): - self.cycle = MeasureAllSources() - self.display = MeasureAllSources() +class PhaseMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, store: dict) -> None: + self._query = query + self._store = store -class MeasureIntervalMultipleSources: - def __init__(self): - self.cycle = MultiMeasureSources() - self.display = MultiMeasureSources() + def __call__(self) -> float: + return self._query( + f"MEAS:PHAS? {self._store['source1']},{self._store['source2']}" + ) -class MeasureRMS: - def __init__(self): - self.dc = MeasureInterval() - self.ac = MeasureInterval() +class Measure: + def __init__( + self, write: WriteCallback, query: QueryAsciiValuesCallback, store: dict + ) -> None: + self.counter = MeasurementBase(query, "MEAS:COUN?") + self.duty = MeasurementBase(query, "MEAS:DUTY?") + self.fall_time = MeasurementBase(query, "MEAS:FALL?") + self.rise_time = MeasurementBase(query, "MEAS:RIS?") + self.frequency = MeasurementBase(query, "MEAS:FREQ?") + self.cnt_edge_rising = MeasurementBase(query, "MEAS:NEDG?") + self.cnt_edge_falling = MeasurementBase(query, "MEAS:PEDG?") + self.cnt_pulse_positive = MeasurementBase(query, "MEAS:NPUL?") + self.cnt_pulse_negative = MeasurementBase(query, "MEAS:PPUL?") + self.period = MeasurementBase(query, "MEAS:PER?") + self.pulse_width = MeasurementBase(query, "MEAS:PWID?") + self.vamplitude = MeasurementBase(query, "MEAS:VAMP?") + self.vbase = MeasurementBase(query, "MEAS:VBAS?") + self.vtop = MeasurementBase(query, "MEAS:VTOP?") + self.vmax = MeasurementBase(query, "MEAS:VMAX?") + self.vmin = MeasurementBase(query, "MEAS:VMIN?") + self.vpp = MeasurementBase(query, "MEAS:VPP?") + self.xmax = MeasurementBase(query, "MEAS:XMAX?") + self.xmin = MeasurementBase(query, "MEAS:XMIN?") + # These need a little more construction to match the old API: + self.vaverage = VAverageMeasurement(query) + self.vrms = VRmsMeasurement(query) + + # Multi source measurements: + self.define = Define(write) + self.delay = DelayMeasurement(write, query, store) + self.phase = PhaseMeasurement(query, store) + self.vratio = VRatioMeasurement(query, store) -class Threshold: - def percent(self, upper: number, middle: number, lower: number): - """ - :param upper: Upper Threshold - :param middle: Middle Threshold - :param lower: Lower Threshold - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class DSO(metaclass=ABCMeta): + """ + DSO Base class. + You don't know what version of the scope you will get until run time. + The regex id is going to be the defining factor as to how many channels the DSO has. - def absolute(self, upper: number, middle: number, lower: number): - """ - :param upper: Upper Threshold - :param middle: Middle Threshold - :param lower: Lower Threshold - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + We can assume for the helper class however that all the functionality is available. + On runtime, the correct instrument is then selected and will error if its not implemented. + """ + REGEX_ID: str = "DSO" + INSTR_TYPE: str = "VISA" -class Define: - def __init__(self): - super().__init__() - self.threshold = Threshold() + def __init__(self, instrument): + self.instrument = instrument + self.samples = 1 + # Callbacks: + _write = self._write_cmd + _query_after_acquire = self._query_after_acquire_cmd + _query_bool = self._query_bool_cmd -class Delay(CallableNoArgs): - def __init__(self): - super().__init__() - self.edges = MultiSlopes() + # Retain for compatibility: + self._store: dict = {} + # ------------------------- + # Construct the API: + # ------------------------- + self.source1 = SourceSelector(self._store, "source1") + self.source2 = SourceSelector(self._store, "source2") -class Measure: - def __init__(self): - self.counter = MeasureAllSources() - self.define = Define() - self.delay = Delay() - self.duty = MeasureAllSources() - self.fall_time = MeasureAllSources() - self.rise_time = MeasureAllSources() - self.frequency = MeasureAllSources() - self.cnt_edge_rising = MeasureAllSources() - self.cnt_edge_falling = MeasureAllSources() - self.cnt_pulse_positive = MeasureAllSources() - self.cnt_pulse_negative = MeasureAllSources() - self.period = MeasureAllSources() - self.phase = MultiMeasureSources() - self.pulse_width = MeasureAllSources() - self.vamplitude = MeasureAllSources() - self.vaverage = MeasureInterval() - self.vbase = MeasureAllSources() - self.vtop = MeasureAllSources() - self.vmax = MeasureAllSources() - self.vmin = MeasureAllSources() - self.vpp = MeasureAllSources() - self.vratio = MeasureIntervalMultipleSources() - self.vrms = MeasureRMS() - self.xmax = MeasureAllSources() - self.xmin = MeasureAllSources() - - -class MultiSlopes(CallableNoArgs): - def __init__(self): - super().__init__() - self.rising = Slopes() - self.falling = Slopes() - self.alternating = Slopes() - self.either = Slopes() + self.ch1 = Channel("1", _write) + self.ch2 = Channel("2", _write) + self.ch3 = Channel("3", _write) + self.ch4 = Channel("4", _write) + self.time_base = TimeBase(_write) -class DSO(metaclass=ABCMeta): - REGEX_ID = "DSO" + self.trigger = Trigger(_write) - def __init__(self, instrument): - self.instrument = instrument - self.samples = 1 - self.api = [] - self.ch1 = ChannelBase("1") - self.ch2 = ChannelBase("2") - self.ch3 = ChannelBase("3") - self.ch4 = ChannelBase("4") - self.chmath = ChannelBase("math") - self.chfunc = ChannelBase("func") - self.coupling = Coupling() - self.probe = Probe() - self.source1 = MultiMeasureSources() - self.source2 = MultiMeasureSources() - self.trigger = Trigger() - self.time_base = Timebase() - self.acquire = Acquire() - self.measure = Measure() - self.events = Events() + self.acquire = Acquire(_write) - @abstractmethod - def acquire(self, acquire_type, averaging_samples): - pass + self.measure = Measure(_write, _query_after_acquire, self._store) + # Mandatory methods to implement per driver: @abstractmethod - def waveform_values(self, source, filename): - pass + def single(self) -> None: + """Sets oscilliscope to take a single shot measurement""" @abstractmethod - def reset(self): - pass + def run(self) -> None: + """Sets the oscilliscope to run mode""" @abstractmethod - def auto_scale(self): - pass + def stop(self) -> None: + """Puts the oscillicope in stop mode""" @abstractmethod - def save_setup(self, file_name): - pass + def reset(self) -> None: + """Resets the oscilliscope""" @abstractmethod - def load_setup(self, file_name): - pass + def wait_for_acquire(self) -> None: + """Block until the current acquisition is complete.""" @abstractmethod - def get_identity(self): - pass - - def run(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def single(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def stop(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def offset(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + def get_identity(self) -> str: + """Return the IDN string of the device""" - def init_api(self): - for func_str, handler, base_str in self.api: - *parents, func = func_str.split(".") - parent_obj = self - for parent in parents: - parent_obj = getattr(parent_obj, parent) - func_obc = getattr(parent_obj, func) - setattr(parent_obj, func, self.prepare_string(func_obc, handler, base_str)) - - def prepare_string(self, func, handler, base_str, *args, **kwargs): - def temp_func(*nargs, **nkwargs): - """ - Only formats using **nkwargs - New Temp - :param nargs: - :param nkwargs: - :return: - """ - sig = inspect.signature(func) - keys = [itm[0] for itm in sig.parameters.items()] - for index, param in enumerate(nargs): - nkwargs[keys[index]] = param - # new_str = base_str.format(**nkwargs) - # handler(self, new_str) - return handler(base_str, **nkwargs) - - return update_wrapper(temp_func, func) + @abstractmethod + def _check_errors(self) -> tuple[int, str]: + """Check the error buffer for errors""" + + # Query and write functions: + def _write_cmd(self, cmd: str) -> None: + """Write a SCPI command and raise on instrument error.""" + self.instrument.write(cmd) + + def _query_bool_cmd(self, cmd: str) -> bool: + values = self.instrument.query_ascii_values(cmd) + self._raise_if_error() + return bool(values[0]) + + def _query_after_acquire_cmd(self, cmd: str) -> float: + """Wait for a complete acquisition then query.""" + self.wait_for_acquire() + try: + result = self.instrument.query_ascii_values(cmd)[0] + except Exception: + self.instrument.close() + self.instrument.open() + raise + return result + + def _raise_if_error(self): + errors = [] + while True: + code, msg = self._check_errors() + if code != 0: + errors.append((code, msg)) + else: + break + if errors: + raise InstrumentError( + "Error(s) Returned from DSO\n" + + "\n".join( + ["Code: {}\nMessage:{}".format(code, msg) for code, msg in errors] + ) + ) + + # Context management: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.instrument.close() + self.is_connected = False