From 7de4c195e7d82e0cfe7179f018c6099c20e5cc30 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 4 Feb 2026 07:48:15 +0100 Subject: [PATCH 1/4] Improve types in RTO1000 driver --- .../rohde_schwarz/RTO1000.py | 51 ++++++++++++++----- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/qcodes/instrument_drivers/rohde_schwarz/RTO1000.py b/src/qcodes/instrument_drivers/rohde_schwarz/RTO1000.py index 0be47302064d..94ad53118d20 100644 --- a/src/qcodes/instrument_drivers/rohde_schwarz/RTO1000.py +++ b/src/qcodes/instrument_drivers/rohde_schwarz/RTO1000.py @@ -1,6 +1,5 @@ # All manual references are to R&S RTO Digital Oscilloscope User Manual # for firmware 3.65, 2017 - import logging import time import warnings @@ -12,6 +11,8 @@ import qcodes.validators as vals from qcodes.instrument import ( + ChannelList, + ChannelTuple, Instrument, InstrumentBaseKWArgs, InstrumentChannel, @@ -26,9 +27,13 @@ log = logging.getLogger(__name__) -class ScopeTrace(ArrayParameter): +class ScopeTrace(ArrayParameter[npt.NDArray, "RohdeSchwarzRTO1000ScopeChannel"]): def __init__( - self, name: str, instrument: InstrumentChannel, channum: int, **kwargs: Any + self, + name: str, + instrument: "RohdeSchwarzRTO1000ScopeChannel", + channum: int, + **kwargs: Any, ) -> None: """ The ScopeTrace parameter is attached to a channel of the oscilloscope. @@ -53,6 +58,12 @@ def __init__( self.channum = channum self._trace_ready = False + @property + def root_instrument(self) -> "RohdeSchwarzRTO1000": + root_instrument = super().root_instrument + assert isinstance(root_instrument, RohdeSchwarzRTO1000) + return root_instrument + def prepare_trace(self) -> None: """ Prepare the scope for returning data, calculate the setpoints @@ -452,7 +463,7 @@ def __init__( ScopeMeasurement = RohdeSchwarzRTO1000ScopeMeasurement -class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel): +class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel["RohdeSchwarzRTO1000"]): """ Class to hold an input channel of the scope. @@ -462,7 +473,7 @@ class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel): def __init__( self, - parent: Instrument, + parent: "RohdeSchwarzRTO1000", name: str, channum: int, **kwargs: "Unpack[InstrumentBaseKWArgs]", @@ -631,12 +642,12 @@ def __init__( def _set_range(self, value: float) -> None: self.scale.cache.set(value / 10) - self._parent.write(f"CHANnel{self.channum}:RANGe {value}") + self.parent.write(f"CHANnel{self.channum}:RANGe {value}") def _set_scale(self, value: float) -> None: self.range.cache.set(value * 10) - self._parent.write(f"CHANnel{self.channum}:SCALe {value}") + self.parent.write(f"CHANnel{self.channum}:SCALe {value}") ScopeChannel = RohdeSchwarzRTO1000ScopeChannel @@ -966,15 +977,31 @@ def __init__( """Parameter error_next""" # Add the channels to the instrument + scope_channels = ChannelList( + self, "scope_channels", RohdeSchwarzRTO1000ScopeChannel + ) + """ChannelTuple holding the scope channels. + """ for ch in range(1, self.num_chans + 1): chan = RohdeSchwarzRTO1000ScopeChannel(self, f"channel{ch}", ch) + scope_channels.append(chan) self.add_submodule(f"ch{ch}", chan) - + self.scope_channels: ChannelTuple[RohdeSchwarzRTO1000ScopeChannel] = ( + self.add_submodule("scope_channels", scope_channels.to_channel_tuple()) + ) + measurements = ChannelList( + self, "measurements", RohdeSchwarzRTO1000ScopeMeasurement + ) for measId in range(1, self.num_meas + 1): measCh = RohdeSchwarzRTO1000ScopeMeasurement( self, f"measurement{measId}", measId ) + measurements.append(measCh) self.add_submodule(f"meas{measId}", measCh) + self.measurements: ChannelTuple[RohdeSchwarzRTO1000ScopeMeasurement] = ( + self.add_submodule("measurements", measurements.to_channel_tuple()) + ) + """ChannelTuple holding the scope measurements.""" self.add_function("stop", call_cmd="STOP") self.add_function("reset", call_cmd="*RST") @@ -1055,10 +1082,8 @@ def _make_traces_not_ready(self) -> None: """ Make the scope traces be not ready. """ - self.ch1.trace._trace_ready = False - self.ch2.trace._trace_ready = False - self.ch3.trace._trace_ready = False - self.ch4.trace._trace_ready = False + for chan in self.scope_channels: + chan.trace._trace_ready = False def _set_trigger_level(self, value: float) -> None: """ @@ -1071,7 +1096,7 @@ def _set_trigger_level(self, value: float) -> None: source = trans[self.trigger_source.get()] if source != 5: submodule = self.submodules[f"ch{source}"] - assert isinstance(submodule, InstrumentChannel) + assert isinstance(submodule, RohdeSchwarzRTO1000ScopeChannel) v_range = submodule.range() offset = submodule.offset() From a8dabb3ada4c282bb1be40a26d62dc954055ad37 Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 4 Feb 2026 07:51:31 +0100 Subject: [PATCH 2/4] Improve types in Rigol DS1074Z driver --- .../instrument_drivers/rigol/Rigol_DS1074Z.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/qcodes/instrument_drivers/rigol/Rigol_DS1074Z.py b/src/qcodes/instrument_drivers/rigol/Rigol_DS1074Z.py index e8a281589037..b3e196205544 100644 --- a/src/qcodes/instrument_drivers/rigol/Rigol_DS1074Z.py +++ b/src/qcodes/instrument_drivers/rigol/Rigol_DS1074Z.py @@ -19,7 +19,7 @@ from qcodes.parameters import Parameter -class RigolDS1074ZChannel(InstrumentChannel): +class RigolDS1074ZChannel(InstrumentChannel["RigolDS1074Z"]): """ Contains methods and attributes specific to the Rigol oscilloscope channels. @@ -58,9 +58,9 @@ def __init__( """Parameter trace""" def _get_full_trace(self) -> npt.NDArray: - y_ori = self.root_instrument.waveform_yorigin() - y_increm = self.root_instrument.waveform_yincrem() - y_ref = self.root_instrument.waveform_yref() + y_ori = self.parent.waveform_yorigin() + y_increm = self.parent.waveform_yincrem() + y_ref = self.parent.waveform_yref() y_raw = self._get_raw_trace() y_raw_shifted = y_raw - y_ori - y_ref full_data = np.multiply(y_raw_shifted, y_increm) @@ -68,13 +68,13 @@ def _get_full_trace(self) -> npt.NDArray: def _get_raw_trace(self) -> npt.NDArray: # set the out type from oscilloscope channels to WORD - self.root_instrument.write(":WAVeform:FORMat WORD") + self.parent.write(":WAVeform:FORMat WORD") # set the channel from where data will be obtained - self.root_instrument.data_source(f"ch{self.channel}") + self.parent.data_source(f"ch{self.channel}") # Obtain the trace - raw_trace_val = self.root_instrument.visa_handle.query_binary_values( + raw_trace_val = self.parent.visa_handle.query_binary_values( "WAV:DATA?", datatype="h", is_big_endian=False, expect_termination=False ) return np.array(raw_trace_val) @@ -231,10 +231,8 @@ def _get_time_axis(self) -> npt.NDArray: return xdata def _get_trigger_level(self) -> str: - trigger_level = self.root_instrument.ask( - f":TRIGger:{self.trigger_mode()}:LEVel?" - ) + trigger_level = self.ask(f":TRIGger:{self.trigger_mode()}:LEVel?") return trigger_level def _set_trigger_level(self, value: str) -> None: - self.root_instrument.write(f":TRIGger:{self.trigger_mode()}:LEVel {value}") + self.write(f":TRIGger:{self.trigger_mode()}:LEVel {value}") From ef4def535a86d49acb6790f3b9460ec186b5fd2a Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 4 Feb 2026 07:52:58 +0100 Subject: [PATCH 3/4] Add a reset method for type checking --- src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py b/src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py index 8d0c2fafe546..873cfc7289d8 100644 --- a/src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py +++ b/src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py @@ -738,13 +738,15 @@ def __init__( # Trace self.add_function("upload_data", call_cmd=self._upload_data, args=[Anything()]) - self.add_function("reset", call_cmd="*RST") - if reset: self.reset() self.connect_message() + def reset(self) -> None: + """Reset the instrument to default settings.""" + self.write("*RST") + def _upload_data(self, data: "Sequence[float] | npt.NDArray") -> None: """ Upload data to the AWG memory. From f4df9e3d538feaa255952c573071c9dd954e658e Mon Sep 17 00:00:00 2001 From: "Jens H. Nielsen" Date: Wed, 4 Feb 2026 07:39:05 +0100 Subject: [PATCH 4/4] Improve types in ZNB driver --- .../instrument_drivers/rohde_schwarz/ZNB.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/qcodes/instrument_drivers/rohde_schwarz/ZNB.py b/src/qcodes/instrument_drivers/rohde_schwarz/ZNB.py index 1529846de6c1..f8fad7c1f034 100644 --- a/src/qcodes/instrument_drivers/rohde_schwarz/ZNB.py +++ b/src/qcodes/instrument_drivers/rohde_schwarz/ZNB.py @@ -397,7 +397,7 @@ def get_raw(self) -> npt.NDArray[np.floating]: return self.instrument._get_sweep_data() -class RohdeSchwarzZNBChannel(InstrumentChannel): +class RohdeSchwarzZNBChannel(InstrumentChannel["RohdeSchwarzZNBBase"]): def __init__( self, parent: "RohdeSchwarzZNBBase", @@ -432,7 +432,7 @@ def __init__( if existing_trace_to_bind_to is None: self._tracename = f"Trc{channel}" else: - traces = self._parent.ask("CONFigure:TRACe:CATalog?") + traces = self.parent.ask("CONFigure:TRACe:CATalog?") if existing_trace_to_bind_to not in traces: raise RuntimeError( f"Trying to bind to" @@ -759,12 +759,10 @@ def __init__( def set_electrical_delay_auto(self) -> None: n = self._instrument_channel - self.root_instrument.write(f"SENS{n}:CORR:EDEL:AUTO ONCE") + self.parent.write(f"SENS{n}:CORR:EDEL:AUTO ONCE") def autoscale(self) -> None: - self.root_instrument.write( - f"DISPlay:TRACe1:Y:SCALe:AUTO ONCE, {self._tracename}" - ) + self.parent.write(f"DISPlay:TRACe1:Y:SCALe:AUTO ONCE, {self._tracename}") def _get_format(self, tracename: str) -> str: n = self._instrument_channel @@ -925,7 +923,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray: # preserve original state of the znb with self.status.set_to(1): - self.root_instrument.cont_meas_off() + self.parent.cont_meas_off() try: # if force polar is set, the SDAT data format will be used. # Here the data will be transferred as a complex number @@ -935,7 +933,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray: else: data_format_command = "FDAT" - with self.root_instrument.timeout.set_to(self._get_timeout()): + with self.parent.timeout.set_to(self._get_timeout()): # instrument averages over its last 'avg' number of sweeps # need to ensure averaged result is returned for _ in range(self.avg()): @@ -950,7 +948,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray: if self.format() in ["Polar", "Complex", "Smith", "Inverse Smith"]: data = data[0::2] + 1j * data[1::2] finally: - self.root_instrument.cont_meas_on() + self.parent.cont_meas_on() return data def setup_cw_sweep(self) -> None: @@ -977,7 +975,7 @@ def setup_cw_sweep(self) -> None: self.auto_sweep_time_enabled(True) # Set cont measurement off here so we don't have to send that command # while measuring later. - self.root_instrument.cont_meas_off() + self.parent.cont_meas_off() def setup_lin_sweep(self) -> None: """ @@ -985,7 +983,7 @@ def setup_lin_sweep(self) -> None: """ self.sweep_type("Linear") self.averaging_enabled(True) - self.root_instrument.cont_meas_on() + self.parent.cont_meas_on() def _check_cw_sweep(self) -> None: """ @@ -998,7 +996,7 @@ def _check_cw_sweep(self) -> None: f"mode, instead it is: {self.sweep_type()}" ) - if not self.root_instrument.rf_power(): + if not self.parent.rf_power(): log.warning("RF output is off when getting sweep data") # It is possible that the instrument and QCoDeS disagree about @@ -1016,7 +1014,7 @@ def _check_cw_sweep(self) -> None: # Set the format to complex. self.format("Complex") # Set cont measurement off. - self.root_instrument.cont_meas_off() + self.parent.cont_meas_off() # Cache the sweep time so it is up to date when setting timeouts self.sweep_time() @@ -1027,7 +1025,7 @@ def _get_cw_data(self) -> tuple[npt.NDArray, npt.NDArray]: self._check_cw_sweep() with self.status.set_to(1): - with self.root_instrument.timeout.set_to(self._get_timeout()): + with self.parent.timeout.set_to(self._get_timeout()): self.write(f"INIT{self._instrument_channel}:IMM; *WAI") data_str = self.ask(f"CALC{self._instrument_channel}:DATA? SDAT") data = np.array(data_str.rstrip().split(",")).astype("float64") @@ -1037,7 +1035,7 @@ def _get_cw_data(self) -> tuple[npt.NDArray, npt.NDArray]: return i, q def _get_timeout(self) -> float: - timeout = self.root_instrument.timeout() or float("+inf") + timeout = self.parent.timeout() or float("+inf") timeout = max(self.sweep_time.cache.get() * 1.5, timeout) return timeout