Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/qcodes/instrument_drivers/rigol/Rigol_DG4000.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,15 @@ def __init__(
# Trace
self.add_function("upload_data", call_cmd=self._upload_data, args=[Anything()])

self.add_function("reset", call_cmd="*RST")

if reset:
self.reset()

self.connect_message()

def reset(self) -> None:
"""Reset the instrument to default settings."""
self.write("*RST")

def _upload_data(self, data: "Sequence[float] | npt.NDArray") -> None:
"""
Upload data to the AWG memory.
Expand Down
20 changes: 9 additions & 11 deletions src/qcodes/instrument_drivers/rigol/Rigol_DS1074Z.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from qcodes.parameters import Parameter


class RigolDS1074ZChannel(InstrumentChannel):
class RigolDS1074ZChannel(InstrumentChannel["RigolDS1074Z"]):
"""
Contains methods and attributes specific to the Rigol
oscilloscope channels.
Expand Down Expand Up @@ -58,23 +58,23 @@ def __init__(
"""Parameter trace"""

def _get_full_trace(self) -> npt.NDArray:
y_ori = self.root_instrument.waveform_yorigin()
y_increm = self.root_instrument.waveform_yincrem()
y_ref = self.root_instrument.waveform_yref()
y_ori = self.parent.waveform_yorigin()
y_increm = self.parent.waveform_yincrem()
y_ref = self.parent.waveform_yref()
y_raw = self._get_raw_trace()
y_raw_shifted = y_raw - y_ori - y_ref
full_data = np.multiply(y_raw_shifted, y_increm)
return full_data

def _get_raw_trace(self) -> npt.NDArray:
# set the out type from oscilloscope channels to WORD
self.root_instrument.write(":WAVeform:FORMat WORD")
self.parent.write(":WAVeform:FORMat WORD")

# set the channel from where data will be obtained
self.root_instrument.data_source(f"ch{self.channel}")
self.parent.data_source(f"ch{self.channel}")

# Obtain the trace
raw_trace_val = self.root_instrument.visa_handle.query_binary_values(
raw_trace_val = self.parent.visa_handle.query_binary_values(
"WAV:DATA?", datatype="h", is_big_endian=False, expect_termination=False
)
return np.array(raw_trace_val)
Expand Down Expand Up @@ -231,10 +231,8 @@ def _get_time_axis(self) -> npt.NDArray:
return xdata

def _get_trigger_level(self) -> str:
trigger_level = self.root_instrument.ask(
f":TRIGger:{self.trigger_mode()}:LEVel?"
)
trigger_level = self.ask(f":TRIGger:{self.trigger_mode()}:LEVel?")
return trigger_level

def _set_trigger_level(self, value: str) -> None:
self.root_instrument.write(f":TRIGger:{self.trigger_mode()}:LEVel {value}")
self.write(f":TRIGger:{self.trigger_mode()}:LEVel {value}")
51 changes: 38 additions & 13 deletions src/qcodes/instrument_drivers/rohde_schwarz/RTO1000.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# All manual references are to R&S RTO Digital Oscilloscope User Manual
# for firmware 3.65, 2017

import logging
import time
import warnings
Expand All @@ -12,6 +11,8 @@

import qcodes.validators as vals
from qcodes.instrument import (
ChannelList,
ChannelTuple,
Instrument,
InstrumentBaseKWArgs,
InstrumentChannel,
Expand All @@ -26,9 +27,13 @@
log = logging.getLogger(__name__)


class ScopeTrace(ArrayParameter):
class ScopeTrace(ArrayParameter[npt.NDArray, "RohdeSchwarzRTO1000ScopeChannel"]):
def __init__(
self, name: str, instrument: InstrumentChannel, channum: int, **kwargs: Any
self,
name: str,
instrument: "RohdeSchwarzRTO1000ScopeChannel",
channum: int,
**kwargs: Any,
) -> None:
"""
The ScopeTrace parameter is attached to a channel of the oscilloscope.
Expand All @@ -53,6 +58,12 @@ def __init__(
self.channum = channum
self._trace_ready = False

@property
def root_instrument(self) -> "RohdeSchwarzRTO1000":
root_instrument = super().root_instrument
assert isinstance(root_instrument, RohdeSchwarzRTO1000)
return root_instrument

def prepare_trace(self) -> None:
"""
Prepare the scope for returning data, calculate the setpoints
Expand Down Expand Up @@ -452,7 +463,7 @@ def __init__(
ScopeMeasurement = RohdeSchwarzRTO1000ScopeMeasurement


class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel):
class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel["RohdeSchwarzRTO1000"]):
"""
Class to hold an input channel of the scope.

Expand All @@ -462,7 +473,7 @@ class RohdeSchwarzRTO1000ScopeChannel(InstrumentChannel):

def __init__(
self,
parent: Instrument,
parent: "RohdeSchwarzRTO1000",
name: str,
channum: int,
**kwargs: "Unpack[InstrumentBaseKWArgs]",
Expand Down Expand Up @@ -631,12 +642,12 @@ def __init__(
def _set_range(self, value: float) -> None:
self.scale.cache.set(value / 10)

self._parent.write(f"CHANnel{self.channum}:RANGe {value}")
self.parent.write(f"CHANnel{self.channum}:RANGe {value}")

def _set_scale(self, value: float) -> None:
self.range.cache.set(value * 10)

self._parent.write(f"CHANnel{self.channum}:SCALe {value}")
self.parent.write(f"CHANnel{self.channum}:SCALe {value}")


ScopeChannel = RohdeSchwarzRTO1000ScopeChannel
Expand Down Expand Up @@ -966,15 +977,31 @@ def __init__(
"""Parameter error_next"""

# Add the channels to the instrument
scope_channels = ChannelList(
self, "scope_channels", RohdeSchwarzRTO1000ScopeChannel
)
"""ChannelTuple holding the scope channels.
"""
for ch in range(1, self.num_chans + 1):
chan = RohdeSchwarzRTO1000ScopeChannel(self, f"channel{ch}", ch)
scope_channels.append(chan)
self.add_submodule(f"ch{ch}", chan)

self.scope_channels: ChannelTuple[RohdeSchwarzRTO1000ScopeChannel] = (
self.add_submodule("scope_channels", scope_channels.to_channel_tuple())
)
measurements = ChannelList(
self, "measurements", RohdeSchwarzRTO1000ScopeMeasurement
)
for measId in range(1, self.num_meas + 1):
measCh = RohdeSchwarzRTO1000ScopeMeasurement(
self, f"measurement{measId}", measId
)
measurements.append(measCh)
self.add_submodule(f"meas{measId}", measCh)
self.measurements: ChannelTuple[RohdeSchwarzRTO1000ScopeMeasurement] = (
self.add_submodule("measurements", measurements.to_channel_tuple())
)
"""ChannelTuple holding the scope measurements."""

self.add_function("stop", call_cmd="STOP")
self.add_function("reset", call_cmd="*RST")
Expand Down Expand Up @@ -1055,10 +1082,8 @@ def _make_traces_not_ready(self) -> None:
"""
Make the scope traces be not ready.
"""
self.ch1.trace._trace_ready = False
self.ch2.trace._trace_ready = False
self.ch3.trace._trace_ready = False
self.ch4.trace._trace_ready = False
for chan in self.scope_channels:
chan.trace._trace_ready = False

def _set_trigger_level(self, value: float) -> None:
"""
Expand All @@ -1071,7 +1096,7 @@ def _set_trigger_level(self, value: float) -> None:
source = trans[self.trigger_source.get()]
if source != 5:
submodule = self.submodules[f"ch{source}"]
assert isinstance(submodule, InstrumentChannel)
assert isinstance(submodule, RohdeSchwarzRTO1000ScopeChannel)
v_range = submodule.range()
offset = submodule.offset()

Expand Down
28 changes: 13 additions & 15 deletions src/qcodes/instrument_drivers/rohde_schwarz/ZNB.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def get_raw(self) -> npt.NDArray[np.floating]:
return self.instrument._get_sweep_data()


class RohdeSchwarzZNBChannel(InstrumentChannel):
class RohdeSchwarzZNBChannel(InstrumentChannel["RohdeSchwarzZNBBase"]):
def __init__(
self,
parent: "RohdeSchwarzZNBBase",
Expand Down Expand Up @@ -432,7 +432,7 @@ def __init__(
if existing_trace_to_bind_to is None:
self._tracename = f"Trc{channel}"
else:
traces = self._parent.ask("CONFigure:TRACe:CATalog?")
traces = self.parent.ask("CONFigure:TRACe:CATalog?")
if existing_trace_to_bind_to not in traces:
raise RuntimeError(
f"Trying to bind to"
Expand Down Expand Up @@ -759,12 +759,10 @@ def __init__(

def set_electrical_delay_auto(self) -> None:
n = self._instrument_channel
self.root_instrument.write(f"SENS{n}:CORR:EDEL:AUTO ONCE")
self.parent.write(f"SENS{n}:CORR:EDEL:AUTO ONCE")

def autoscale(self) -> None:
self.root_instrument.write(
f"DISPlay:TRACe1:Y:SCALe:AUTO ONCE, {self._tracename}"
)
self.parent.write(f"DISPlay:TRACe1:Y:SCALe:AUTO ONCE, {self._tracename}")

def _get_format(self, tracename: str) -> str:
n = self._instrument_channel
Expand Down Expand Up @@ -925,7 +923,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray:

# preserve original state of the znb
with self.status.set_to(1):
self.root_instrument.cont_meas_off()
self.parent.cont_meas_off()
try:
# if force polar is set, the SDAT data format will be used.
# Here the data will be transferred as a complex number
Expand All @@ -935,7 +933,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray:
else:
data_format_command = "FDAT"

with self.root_instrument.timeout.set_to(self._get_timeout()):
with self.parent.timeout.set_to(self._get_timeout()):
# instrument averages over its last 'avg' number of sweeps
# need to ensure averaged result is returned
for _ in range(self.avg()):
Expand All @@ -950,7 +948,7 @@ def _get_sweep_data(self, force_polar: bool = False) -> npt.NDArray:
if self.format() in ["Polar", "Complex", "Smith", "Inverse Smith"]:
data = data[0::2] + 1j * data[1::2]
finally:
self.root_instrument.cont_meas_on()
self.parent.cont_meas_on()
return data

def setup_cw_sweep(self) -> None:
Expand All @@ -977,15 +975,15 @@ def setup_cw_sweep(self) -> None:
self.auto_sweep_time_enabled(True)
# Set cont measurement off here so we don't have to send that command
# while measuring later.
self.root_instrument.cont_meas_off()
self.parent.cont_meas_off()

def setup_lin_sweep(self) -> None:
"""
Setup the instrument into linear sweep mode.
"""
self.sweep_type("Linear")
self.averaging_enabled(True)
self.root_instrument.cont_meas_on()
self.parent.cont_meas_on()

def _check_cw_sweep(self) -> None:
"""
Expand All @@ -998,7 +996,7 @@ def _check_cw_sweep(self) -> None:
f"mode, instead it is: {self.sweep_type()}"
)

if not self.root_instrument.rf_power():
if not self.parent.rf_power():
log.warning("RF output is off when getting sweep data")

# It is possible that the instrument and QCoDeS disagree about
Expand All @@ -1016,7 +1014,7 @@ def _check_cw_sweep(self) -> None:
# Set the format to complex.
self.format("Complex")
# Set cont measurement off.
self.root_instrument.cont_meas_off()
self.parent.cont_meas_off()
# Cache the sweep time so it is up to date when setting timeouts
self.sweep_time()

Expand All @@ -1027,7 +1025,7 @@ def _get_cw_data(self) -> tuple[npt.NDArray, npt.NDArray]:
self._check_cw_sweep()

with self.status.set_to(1):
with self.root_instrument.timeout.set_to(self._get_timeout()):
with self.parent.timeout.set_to(self._get_timeout()):
self.write(f"INIT{self._instrument_channel}:IMM; *WAI")
data_str = self.ask(f"CALC{self._instrument_channel}:DATA? SDAT")
data = np.array(data_str.rstrip().split(",")).astype("float64")
Expand All @@ -1037,7 +1035,7 @@ def _get_cw_data(self) -> tuple[npt.NDArray, npt.NDArray]:
return i, q

def _get_timeout(self) -> float:
timeout = self.root_instrument.timeout() or float("+inf")
timeout = self.parent.timeout() or float("+inf")
timeout = max(self.sweep_time.cache.get() * 1.5, timeout)
return timeout

Expand Down
Loading