Source code for device.oscope_mso5000

import time
import numpy as np
import pathlib

from . import base


# TODO: Trigger?

[docs] class Oscope_MSO5000(base.VisaHardwareBase): """Oscilloscope (interferometry experiment). Model name: RIGOL MSO5000 Communication: - LAN, using VISA protocol and SCPI commands """ SAVE_DIR = pathlib.Path.home() / "saved_data" def __init__(self, ip_address, **kwargs): super().__init__( resource_name="TCPIP0::" + ip_address + "::INSTR", timeout=5_000, # Some oscope operations require a few seconds **kwargs ) self._channel_ids = [] self._activechannel = 1 self._last = { "waveform": {}, "waveform_fft": {}, } # ==== Inherited abstract methods ==== # def _safestate(self): # pass def _readerror(self): """Read new error from device.""" # code, err = self.raw_scpi(":system:error?").strip().split(",") code = self.raw_scpi(":SYSTEM:ERROR?").strip() # The documentation is lying :( if code != "0": # return err[1:-1] # Get rid of q. marks "" return code else: return None def _defaultstate(self): """Init device to a well defined state. (Auto called after connect.)""" # Setup private variables # self._channel_ids = list(range(1, 1 + int(self.raw_scpi(":system:ramount?")))) self._channel_ids = list(range(1, 1 + 3)) self._activechannel = 1 # TODO Consider saving these settings as a preset and then only # send one command to load it. Sending this many commands is slow. # Reset all settings and restrict non-remote access # self.raw_scpi(":system:reset") # self.raw_scpi(":system:locked on") # Set correct probe ratio (real meassured volts : volts at probe cable) for ch in self._channel_ids: self.raw_scpi(":CHANNEL{}:PROBE 1".format(ch)) if int(self.raw_scpi(":CHANNEL{}:DISPLAY?".format(ch))) == 0: self.raw_scpi(":CHANNEL{}:DISPLAY ON".format(ch)) # Time axis mode self.raw_scpi(":TIMEBASE:MODE MAIN") # Fast Fourier transform # for ch in self._channel_ids: # prefix = ":math{}".format(ch) # if self.raw_scpi(prefix + ":operator?").strip() != "FFT": # self.raw_scpi(prefix + ":operator fft") # self.raw_scpi(prefix + ":fft:unit db") # self.raw_scpi(prefix + ":fft:source channel{}".format(ch)) # if int(self.raw_scpi(prefix + ":display?")) == 0: # self.raw_scpi(prefix + ":display on") # self.raw_scpi("*idn?") # Arbitrary query to wait for operations to finish self.raw_scpi(":RUN") time.sleep(3) self.fetch_data() def _clear_defaultstate(self): """Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)""" # self.raw_scpi(":system:locked off") pass # ==== Private methods and commands ==== def _validate_channelid(self, channel): channel = int(channel) if channel not in self._channel_ids: raise base.CommandError("Invalid channel id, use one of {}".format(self._channel_ids)) return channel @base.base_command def _fetch_waveform(self, source): """Return waveform data as a numpy array, shape=(2, n).""" def parse_preamble(): p = self.raw_scpi(":WAVEFORM:PREAMBLE?", "ascii", "f") points = int(p[2]) # Number of points xinfo = {"incr": p[4], "orig": p[5], "ref": p[6]} yinfo = {"incr": p[7], "orig": p[8], "ref": p[9]} return points, xinfo, yinfo # TODO Consider moving folowing two lines to defaultstate() self.raw_scpi(":WAVEFORM:MODE NORMAL") self.raw_scpi(":WAVEFORM:FORMAT BYTE") # Binary format => faster query than ascii self.raw_scpi(":WAVEFORM:SOURCE {}".format(source)) points, xax, yax = parse_preamble() # Read data as unsigned bytes 0-255 data = self.raw_scpi(":WAVEFORM:DATA?", "binary", "B", container=np.array) if not "math" in source: # source from channels xdata = (np.arange(points) - xax["ref"]) * xax["incr"] + xax["orig"] data = (data - yax["ref"] - yax["orig"]) * yax["incr"] else: # source from math (FFT) xdata = (np.arange(points) - xax["ref"]) * xax["incr"] + max(xax["orig"], 0) data = (data - yax["ref"]) * yax["incr"] return np.stack((xdata, data)) # ==== Commands ====
[docs] @base.base_command def set_timeaxis(self, scale=None, offset=None): """Horizontal scale and offset of time axis. Set one or both. :param float scale: time interval per screen square [s] :param float offset: offset from screen center [s] Note the dependency - a change in scale may affect offset value. """ if scale is not None: scale = float(scale) # Can raise ValueError self.raw_scpi(":TIMEBASE:SCALE {:.6e}".format(scale)) if offset is not None: offset = float(offset) # Can raise ValueError self.raw_scpi(":TIMEBASE:OFFSET {:.6e}".format(offset))
[docs] @base.base_command def timeaxis(self): """Return time axis settings as a tuple (scale, offset), units: s. """ scale = float(self.raw_scpi(":TIMEBASE:SCALE?")) offset = float(self.raw_scpi(":TIMEBASE:OFFSET?")) return scale, offset
[docs] @base.base_command def set_channelaxis(self, scale=None, offset=None): """Vertical scale and offset of active channel. Set one or both. :param float scale: signal magnitude per screen square [V] :param float offset: offset from screen center [V] Note the dependency - a change in scale may affect offset value. """ # channel = self.active_ch for channel in self._channel_ids: if scale is not None: scale = float(scale) # Can raise ValueError self.raw_scpi(":CHANNEL{:d}:SCALE {:.6e}".format(channel, scale)) if offset is not None: offset = float(offset) # Can raise ValueError self.raw_scpi(":CHANNEL{:d}:OFFSET {:.6e}".format(channel, offset))
[docs] @base.base_command def channelaxis(self): """Return tuple (scale, offset) for active channel axis [V]. """ channel = self.active_ch scale = float(self.raw_scpi(":CHANNEL{:d}:SCALE?".format(channel))) offset = float(self.raw_scpi(":CHANNEL{:d}:OFFSET?".format(channel))) return scale, offset
[docs] @base.base_command def set_freqaxis(self, low, high): """FFT: Horizontal frequency axis of active channel. :param float low: lowest frequency [Hz] :param float high: highest frequency [Hz] """ prefix = ":MATH{}:FFT".format(self.active_ch) low = float(low) high = float(high) self.raw_scpi(prefix + ":FREQ:START {:.6e}".format(low)) self.raw_scpi(prefix + ":FREQ:END {:.6e}".format(high)) scale = abs(high - low) / 12 center = (low + high) / 2 self.raw_scpi(prefix + ":HSCALE {:.6e}".format(scale)) self.raw_scpi(prefix + ":HCENTER {:.6e}".format(center))
[docs] @base.base_command def freqaxis(self): """FFT: Return tuple (low, high), frequency range of active channel [Hz]. """ prefix = ":MATH{}:FFT".format(self.active_ch) low = float(self.raw_scpi(prefix + ":FREQ:START?")) high = float(self.raw_scpi(prefix + ":FREQ:END?")) return low, high
[docs] @base.base_command def set_amplitaxis(self, scale=None, offset=None): """FFT: Amplitude scale and offset of active channel. Set one or both. :param float scale: amplitude per screen square [dBV] :param float offset: amplitude offset from screen center [dBV] """ prefix = ":math{}:fft".format(self.active_ch) if scale is not None: scale = float(scale) self.raw_scpi(prefix + ":SCALE {:.6e}".format(scale)) if offset is not None: offset = float(offset) self.raw_scpi(prefix + ":OFFSET {:.6e}".format(offset))
[docs] @base.base_command def amplitaxis(self): """FFT: Return tuple (scale, offset) of active channel amplitude axis [dBV]. """ prefix = ":MATH{}:FFT".format(self.active_ch) scale = float(self.raw_scpi(prefix + ":SCALE?")) offset = float(self.raw_scpi(prefix + ":OFFSET?")) return scale, offset
[docs] @base.base_command def set_fftwindow(self, window): """FFT: Set window function for active channel. :param str window: One of 'RECT', 'BLAC', 'HANN', 'HAMM', 'FLAT', 'TRI'. """ W = ["RECT", "BLAC", "HANN", "HAMM", "FLAT", "TRI"] if window not in W: raise base.CommandError("Use one of {}".format(W)) self.raw_scpi(":MATH{:d}:FFT:WINDOW {}".format(self.active_ch, window))
[docs] @base.base_command def fftwindow(self): """FFT: Return window function that is used for active channel. """ return self.raw_scpi(":MATH{:d}:FFT:WINDOW?".format(self.active_ch))
[docs] @base.base_command def fetch_data(self): # (self, channels=None): """Stop oscilloscope and read captured signal-waveform and fft-waveform from specified channels. Data is stored in properties ``data`` and ``data_fft``. """ channels = None if channels is None or channels == "": channels = self._channel_ids elif isinstance(channels, str): channels = (self._validate_channelid(int(ch)) for ch in channels.split(",")) self.raw_scpi(":STOP") for ch in channels: self._last["waveform"][ch] = self._fetch_waveform("channel" + str(ch)) # self._last["waveform_fft"][ch] = self._fetch_waveform("math" + str(ch)) time.sleep(1) self.raw_scpi(":run")
@property def data(self): """Signal data fetched from active channel (time[s], signal[V]). """ return self._last["waveform"][self.active_ch] @property def data_fft(self): """FFT data fetched from active channel (frequency[Hz], amplitude[dBV]). """ return self._last["waveform_fft"][self.active_ch]
[docs] @base.base_command def save_data(self): # (self, channels=None): """Save last fetched signal-waveforms and fft-waveforms. A list of channels can be specified, otherwise all channels are used. Returns tuple of filepaths (signal-waveforms-file, fft-waveforms-file). :param channels: iterable or comma separated ints in string, use None to save all availible channels """ channels = None if channels is None or channels == "": channels = self._channel_ids elif isinstance(channels, str): channels = (self._validate_channelid(int(ch)) for ch in channels.split(",")) for ch in channels: if ch not in self._last["waveform"]: # or ch not in self._last["waveform_fft"]: raise base.CommandError("No data to save") self.SAVE_DIR.mkdir(exist_ok=True) filepath = "{}_{{}}_{}.txt".format(type(self).__name__, time.strftime("%Y-%m-%d_%H-%M-%S")) filepath = tuple( self.SAVE_DIR / filepath.replace(" ", "_").replace(":", "-").format(x) for x in ("channel", "fft") ) # Reason for pad: When "zoomed out", FFT waveforms may consist of # less than 1200 points. Pad them to 1200 before concat. def pad_concat(arrays, size): return np.concatenate([ np.pad(a, ((0, 0), (0, size - a.shape[1])), constant_values=np.nan) for a in arrays ]) x = np.concatenate([self._last["waveform"][ch] for ch in channels]) h = " | ".join(["Ch{} time[s] signal[V]".format(ch) for ch in channels]) np.savetxt(filepath[0], x.T, fmt="%.6e", header=h) self.log.info("Saved signal-waveforms to '{}'".format(filepath[0])) # x = pad_concat((self._last["waveform_fft"][ch] for ch in channels), 1200) # h = " | ".join(["Ch{} freq[Hz] amplitude[dBV]".format(ch) for ch in channels]) # np.savetxt(filepath[1], x.T, fmt="%.6e", header=h) # self.log.info("Saved fft-waveforms to '{}'".format(filepath[1])) return filepath[0] # tuple(str(f) for f in filepath)
# ==== Methods/commands related to DeviceClient ==== @property def active_ch(self): """Channel displayed at frontend.""" return self._activechannel @active_ch.setter def active_ch(self, channel): """Set which channel is displayed at frontend. :param int channel: Channel id number. """ self._activechannel = self._validate_channelid(channel) # @base.idle_command # def _idleupdate_waveform(self, ch): # self.waveform(ch) # @base.idle_command # def _idleupdate_waveform_fft(self, ch): # self.waveform_fft(ch)
[docs] def update_frontend(self, device_client): # NOTE The oscilloscope (DS1074Z tested, MSO5000 uknown) does not # seem to be capable of real time data streaming. It is not # uncommon for queries such as ":wav:data?" to consume 100ms-1s. # It seems to be caued by the oscilloscope itself. for ch in self._channel_ids: self.active_ch = ch timedata, volts = self.data device_client.emit("graph", { "title": "Channel {}".format(self.active_ch), "x": timedata.tolist(), "y": volts.tolist(), "xlabel": "t [s]", "ylabel": "U [V]", "id": "graph_channel{}".format(ch) })
# hertz, amplitude = self.data_fft # device_client.emit("graph", { # "title": "Channel {} FFT".format(self.active_ch), # "x": hertz.tolist(), # "y": amplitude.tolist(), # "xlabel": "f [Hz]", # "ylabel": "U [dBV]", # "id": "graph_fft" # })