import time
import pathlib
import numpy as np
import json
from . import base
[docs]
class SpectralAnalyser_DSA800(base.VisaHardwareBase):
"""Spectral analyser (resonance experiment).
Model name: RIGOL DSA800
Communication:
- LAN, using VISA protocol and SCPI commands
"""
SAVE_DIR = pathlib.Path.home() / "saved_data"
def __init__(self, ip_address, **kwargs):
super().__init__(
resource_name="TCPIP0::" + ip_address + "::INSTR",
timeout=20_000, # Sometimes, long autocalibration happens
**kwargs
)
self._last = {
"trace": {},
"output_active": False,
}
self._trace_ids = [1, 2, 3]
self._active_trace = 1
# ==== Inherited abstract methods ====
# def _safestate(self):
# pass
def _readerror(self):
"""Read new error from device."""
code, err = self.raw_scpi(":SYSTEM:ERROR?").strip().split(",")
if code != "0":
return err[1:-1] # Get rid of q. marks ""
else:
return None
def _defaultstate(self):
"""Init device to a well defined state. (Auto called after connect.)"""
# TODO Consider saving these settings as a preset and then only
# send one command to load it. Sending this many commands is slow.
# self.raw_scpi(":system:preset:type user1")
# self.raw_scpi(":system:pon:type preset") # [ONCE] Set after start-up
# self.raw_scpi(":system:preset") # [EACH TIME] or activate manually
# Reset all settings and restrict non-remote access
# self.raw_scpi(":system:communicate:brmt on") # Maybe does not work
# self.raw_scpi(":system:klock on,all") # Is this neccessary?
# self.raw_scpi(":display:enable OFF") # Speeds up all measurements
# Make sure the tracking generator is turned off
self.set_tracking_generator(False)
# Set data format
self.raw_scpi(":FORMAT:TRACE:DATA ASCII") # ascii
# Set axes and sweep
self.raw_scpi(":UNIT:POWER DBM")
# Fetch first data
self.fetch_data()
def _clear_defaultstate(self):
"""Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)"""
# self.raw_scpi(":system:klock off,all")
pass
# ==== Private methods and commands ====
def _validate_traceid(self, trace):
trace = int(trace)
if trace not in self._trace_ids:
raise base.CommandError("Invalid trace id, use one of {}".format(self._trace_ids))
return trace
@base.base_command
def _fetch_trace(self, trace):
"""Return trace data as a numpy array, shape=(2, n).
:param int trace: id of trace
"""
data = self.raw_scpi(":TRACE:DATA? TRACE{:d}".format(trace))
data = np.array([float(x) for x in data[2 + int(data[1]):].split(",")])
# TODO Is this xaxis precise?
xdata = np.linspace(
float(self.raw_scpi(":FREQUENCY:START?")),
float(self.raw_scpi(":FREQUENCY:STOP?")),
len(data)
)
return np.stack((xdata, data))
# ==== Commands ====
[docs]
@base.base_command
def set_freqaxis(self, low, high):
"""Horizontal frequency axis.
:param float low: lowest frequency [Hz]
:param float high: highest frequency [Hz]
"""
low = float(low)
high = float(high)
if not low < high:
base.CommandError("Expected low < high")
# self.raw_scpi(":frequency:start {:.6e}".format(low))
# self.raw_scpi(":frequency:end {:.6e}".format(high))
# NOTE Using (center, span) to avoid errors when new_low > old_high
# (the first write command ":frequency:start xxx" then fails)
center = (low + high) / 2
span = abs(high - low)
self.raw_scpi(":FREQUENCY:CENTER {:.6e}".format(center))
self.raw_scpi(":FREQUENCY:SPAN {:.6e}".format(span))
[docs]
@base.base_command
def freqaxis(self):
"""Return tuple (low, high), frequency range [Hz].
"""
low = float(self.raw_scpi(":FREQUENCY:START?"))
high = float(self.raw_scpi(":FREQUENCY:STOP?"))
return low, high
[docs]
@base.base_command
def set_amplitaxis(self, scale=None, offset=None):
"""Vertical amplitude axes - scale and offset. Set one or both.
:param float scale: amplitude ??? [dBm]
:param float offset: amplitude ??? [dBm]
"""
raise NotImplementedError
# TODO Maybe this? Or maybe this is just display and device returns full range.
# self.raw_scpi(":disp:win:trace:y:pdivision ...") # Scale
# self.raw_scpi(":disp:win:trace:y:rlevel ...") # Reference level
[docs]
@base.base_command
def amplitaxis(self):
"""Return tuple (scale, offset) of active channel amplitude axis [dBm].
"""
raise NotImplementedError
[docs]
@base.base_command
def set_tracking_generator(self, active):
"""Turn on/off output of the tracking generator.
:param bool active: either True or False
"""
self.raw_scpi(":OUTPUT:STATE " + {True: "1", False: "0"}[active])
self._last["output_active"] = active
[docs]
def tracking_generator(self):
return self._last["output_active"]
[docs]
@base.base_command
def fetch_data(self):
"""Read all traces from last sweep. Data is stored in property ``data``.
"""
for tr in self._trace_ids:
self._last["trace"][tr] = self._fetch_trace(tr)
@property
def data(self):
"""Trace data of active trace in format (frequency[Hz], amplitude[dBm]).
"""
return self._last["trace"][self.active_trace]
[docs]
@base.base_command
def save_data(self):
"""Save last measured traces to file. Returns filepath.
"""
self.SAVE_DIR.mkdir(exist_ok=True)
filepath = "traces_{}.txt".format(time.strftime("%Y-%m-%d_%H-%M-%S"))
filepath = self.SAVE_DIR / filepath
x = np.concatenate([self._last["trace"][tr] for tr in self._trace_ids])
h = " | ".join(["Tr{} freq[Hz] amplitude[dBm]".format(tr) for tr in self._trace_ids])
np.savetxt(filepath, x.T, fmt="%.6e", header=h)
return str(filepath)
[docs]
@base.compound_command
def multi_scan(self, dc, ranges):
if isinstance(ranges, str):
try:
ranges = json.loads(ranges)
except json.decoder.JSONDecodeError:
raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]")
try:
ranges = np.array(ranges, dtype=float)
except (TypeError, ValueError):
raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]")
if len(ranges.shape) != 2 or 2 not in ranges.shape:
raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]")
ranges = ranges if ranges.shape[0] == 2 else ranges.T
filename = None
for i, (low, high) in enumerate(ranges.T):
self.set_freqaxis(low, high)
time.sleep(1)
filepath = self.save_data()
if filename is None:
filename = filepath.split("/")[-1]
dc.emit_datafile(filepath, f"{filename.rpartition('.')[0]}_scan_{i+1:03d}.txt")
self.log.info(f"Scan finished ({ranges.shape[1]} points)")
# ==== Methods/commands related to DeviceClient ====
@property
def active_trace(self):
return self._active_trace
@active_trace.setter
def active_trace(self, trace):
self._active_trace = self._validate_traceid(trace)
[docs]
def update_frontend(self, device_client):
hertz, amplitude = self.data
device_client.emit("graph", {
"title": "Trace {}".format(self.active_trace),
"x": (hertz / 1e6).tolist(),
"y": amplitude.tolist(),
"xlabel": "f [MHz]",
"ylabel": "P [dBm]",
"id": "graph_trace"
})