Source code for device.spectral_dsa800

import time
import pathlib
import numpy as np
import json

from . import base


[docs] class SpectralAnalyser_DSA800(base.VisaHardwareBase): """Spectral analyser (resonance experiment). Model name: RIGOL DSA800 Communication: - LAN, using VISA protocol and SCPI commands """ SAVE_DIR = pathlib.Path.home() / "saved_data" def __init__(self, ip_address, **kwargs): super().__init__( resource_name="TCPIP0::" + ip_address + "::INSTR", timeout=20_000, # Sometimes, long autocalibration happens **kwargs ) self._last = { "trace": {}, "output_active": False, } self._trace_ids = [1, 2, 3] self._active_trace = 1 # ==== Inherited abstract methods ==== # def _safestate(self): # pass def _readerror(self): """Read new error from device.""" code, err = self.raw_scpi(":SYSTEM:ERROR?").strip().split(",") if code != "0": return err[1:-1] # Get rid of q. marks "" else: return None def _defaultstate(self): """Init device to a well defined state. (Auto called after connect.)""" # TODO Consider saving these settings as a preset and then only # send one command to load it. Sending this many commands is slow. # self.raw_scpi(":system:preset:type user1") # self.raw_scpi(":system:pon:type preset") # [ONCE] Set after start-up # self.raw_scpi(":system:preset") # [EACH TIME] or activate manually # Reset all settings and restrict non-remote access # self.raw_scpi(":system:communicate:brmt on") # Maybe does not work # self.raw_scpi(":system:klock on,all") # Is this neccessary? # self.raw_scpi(":display:enable OFF") # Speeds up all measurements # Make sure the tracking generator is turned off self.set_tracking_generator(False) # Set data format self.raw_scpi(":FORMAT:TRACE:DATA ASCII") # ascii # Set axes and sweep self.raw_scpi(":UNIT:POWER DBM") # Fetch first data self.fetch_data() def _clear_defaultstate(self): """Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)""" # self.raw_scpi(":system:klock off,all") pass # ==== Private methods and commands ==== def _validate_traceid(self, trace): trace = int(trace) if trace not in self._trace_ids: raise base.CommandError("Invalid trace id, use one of {}".format(self._trace_ids)) return trace @base.base_command def _fetch_trace(self, trace): """Return trace data as a numpy array, shape=(2, n). :param int trace: id of trace """ data = self.raw_scpi(":TRACE:DATA? TRACE{:d}".format(trace)) data = np.array([float(x) for x in data[2 + int(data[1]):].split(",")]) # TODO Is this xaxis precise? xdata = np.linspace( float(self.raw_scpi(":FREQUENCY:START?")), float(self.raw_scpi(":FREQUENCY:STOP?")), len(data) ) return np.stack((xdata, data)) # ==== Commands ====
[docs] @base.base_command def set_freqaxis(self, low, high): """Horizontal frequency axis. :param float low: lowest frequency [Hz] :param float high: highest frequency [Hz] """ low = float(low) high = float(high) if not low < high: base.CommandError("Expected low < high") # self.raw_scpi(":frequency:start {:.6e}".format(low)) # self.raw_scpi(":frequency:end {:.6e}".format(high)) # NOTE Using (center, span) to avoid errors when new_low > old_high # (the first write command ":frequency:start xxx" then fails) center = (low + high) / 2 span = abs(high - low) self.raw_scpi(":FREQUENCY:CENTER {:.6e}".format(center)) self.raw_scpi(":FREQUENCY:SPAN {:.6e}".format(span))
[docs] @base.base_command def freqaxis(self): """Return tuple (low, high), frequency range [Hz]. """ low = float(self.raw_scpi(":FREQUENCY:START?")) high = float(self.raw_scpi(":FREQUENCY:STOP?")) return low, high
[docs] @base.base_command def set_amplitaxis(self, scale=None, offset=None): """Vertical amplitude axes - scale and offset. Set one or both. :param float scale: amplitude ??? [dBm] :param float offset: amplitude ??? [dBm] """ raise NotImplementedError
# TODO Maybe this? Or maybe this is just display and device returns full range. # self.raw_scpi(":disp:win:trace:y:pdivision ...") # Scale # self.raw_scpi(":disp:win:trace:y:rlevel ...") # Reference level
[docs] @base.base_command def amplitaxis(self): """Return tuple (scale, offset) of active channel amplitude axis [dBm]. """ raise NotImplementedError
[docs] @base.base_command def set_tracking_generator(self, active): """Turn on/off output of the tracking generator. :param bool active: either True or False """ self.raw_scpi(":OUTPUT:STATE " + {True: "1", False: "0"}[active]) self._last["output_active"] = active
[docs] def tracking_generator(self): return self._last["output_active"]
[docs] @base.base_command def fetch_data(self): """Read all traces from last sweep. Data is stored in property ``data``. """ for tr in self._trace_ids: self._last["trace"][tr] = self._fetch_trace(tr)
@property def data(self): """Trace data of active trace in format (frequency[Hz], amplitude[dBm]). """ return self._last["trace"][self.active_trace]
[docs] @base.base_command def save_data(self): """Save last measured traces to file. Returns filepath. """ self.SAVE_DIR.mkdir(exist_ok=True) filepath = "traces_{}.txt".format(time.strftime("%Y-%m-%d_%H-%M-%S")) filepath = self.SAVE_DIR / filepath x = np.concatenate([self._last["trace"][tr] for tr in self._trace_ids]) h = " | ".join(["Tr{} freq[Hz] amplitude[dBm]".format(tr) for tr in self._trace_ids]) np.savetxt(filepath, x.T, fmt="%.6e", header=h) return str(filepath)
[docs] @base.compound_command def multi_scan(self, dc, ranges): if isinstance(ranges, str): try: ranges = json.loads(ranges) except json.decoder.JSONDecodeError: raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]") try: ranges = np.array(ranges, dtype=float) except (TypeError, ValueError): raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]") if len(ranges.shape) != 2 or 2 not in ranges.shape: raise base.CommandError("Use format [[f1_min, f1_max], [f2_min, f2_max], ...]") ranges = ranges if ranges.shape[0] == 2 else ranges.T filename = None for i, (low, high) in enumerate(ranges.T): self.set_freqaxis(low, high) time.sleep(1) filepath = self.save_data() if filename is None: filename = filepath.split("/")[-1] dc.emit_datafile(filepath, f"{filename.rpartition('.')[0]}_scan_{i+1:03d}.txt") self.log.info(f"Scan finished ({ranges.shape[1]} points)")
# ==== Methods/commands related to DeviceClient ==== @property def active_trace(self): return self._active_trace @active_trace.setter def active_trace(self, trace): self._active_trace = self._validate_traceid(trace)
[docs] def update_frontend(self, device_client): hertz, amplitude = self.data device_client.emit("graph", { "title": "Trace {}".format(self.active_trace), "x": (hertz / 1e6).tolist(), "y": amplitude.tolist(), "xlabel": "f [MHz]", "ylabel": "P [dBm]", "id": "graph_trace" })