import time
import numpy as np
import pathlib
from . import base
# TODO: Trigger?
[docs]
class Oscope_MSO5000(base.VisaHardwareBase):
"""Oscilloscope (interferometry experiment).
Model name: RIGOL MSO5000
Communication:
- LAN, using VISA protocol and SCPI commands
"""
SAVE_DIR = pathlib.Path.home() / "saved_data"
def __init__(self, ip_address, **kwargs):
super().__init__(
resource_name="TCPIP0::" + ip_address + "::INSTR",
timeout=5_000, # Some oscope operations require a few seconds
**kwargs
)
self._channel_ids = []
self._activechannel = 1
self._last = {
"waveform": {},
"waveform_fft": {},
}
# ==== Inherited abstract methods ====
# def _safestate(self):
# pass
def _readerror(self):
"""Read new error from device."""
# code, err = self.raw_scpi(":system:error?").strip().split(",")
code = self.raw_scpi(":SYSTEM:ERROR?").strip() # The documentation is lying :(
if code != "0":
# return err[1:-1] # Get rid of q. marks ""
return code
else:
return None
def _defaultstate(self):
"""Init device to a well defined state. (Auto called after connect.)"""
# Setup private variables
# self._channel_ids = list(range(1, 1 + int(self.raw_scpi(":system:ramount?"))))
self._channel_ids = list(range(1, 1 + 3))
self._activechannel = 1
# TODO Consider saving these settings as a preset and then only
# send one command to load it. Sending this many commands is slow.
# Reset all settings and restrict non-remote access
# self.raw_scpi(":system:reset")
# self.raw_scpi(":system:locked on")
# Set correct probe ratio (real meassured volts : volts at probe cable)
for ch in self._channel_ids:
self.raw_scpi(":CHANNEL{}:PROBE 1".format(ch))
if int(self.raw_scpi(":CHANNEL{}:DISPLAY?".format(ch))) == 0:
self.raw_scpi(":CHANNEL{}:DISPLAY ON".format(ch))
# Time axis mode
self.raw_scpi(":TIMEBASE:MODE MAIN")
# Fast Fourier transform
# for ch in self._channel_ids:
# prefix = ":math{}".format(ch)
# if self.raw_scpi(prefix + ":operator?").strip() != "FFT":
# self.raw_scpi(prefix + ":operator fft")
# self.raw_scpi(prefix + ":fft:unit db")
# self.raw_scpi(prefix + ":fft:source channel{}".format(ch))
# if int(self.raw_scpi(prefix + ":display?")) == 0:
# self.raw_scpi(prefix + ":display on")
# self.raw_scpi("*idn?") # Arbitrary query to wait for operations to finish
self.raw_scpi(":RUN")
time.sleep(3)
self.fetch_data()
def _clear_defaultstate(self):
"""Clean-up device settings (i.e. keys lock). (Auto called before disconnect.)"""
# self.raw_scpi(":system:locked off")
pass
# ==== Private methods and commands ====
def _validate_channelid(self, channel):
channel = int(channel)
if channel not in self._channel_ids:
raise base.CommandError("Invalid channel id, use one of {}".format(self._channel_ids))
return channel
@base.base_command
def _fetch_waveform(self, source):
"""Return waveform data as a numpy array, shape=(2, n)."""
def parse_preamble():
p = self.raw_scpi(":WAVEFORM:PREAMBLE?", "ascii", "f")
points = int(p[2]) # Number of points
xinfo = {"incr": p[4], "orig": p[5], "ref": p[6]}
yinfo = {"incr": p[7], "orig": p[8], "ref": p[9]}
return points, xinfo, yinfo
# TODO Consider moving folowing two lines to defaultstate()
self.raw_scpi(":WAVEFORM:MODE NORMAL")
self.raw_scpi(":WAVEFORM:FORMAT BYTE") # Binary format => faster query than ascii
self.raw_scpi(":WAVEFORM:SOURCE {}".format(source))
points, xax, yax = parse_preamble()
# Read data as unsigned bytes 0-255
data = self.raw_scpi(":WAVEFORM:DATA?", "binary", "B", container=np.array)
if not "math" in source: # source from channels
xdata = (np.arange(points) - xax["ref"]) * xax["incr"] + xax["orig"]
data = (data - yax["ref"] - yax["orig"]) * yax["incr"]
else: # source from math (FFT)
xdata = (np.arange(points) - xax["ref"]) * xax["incr"] + max(xax["orig"], 0)
data = (data - yax["ref"]) * yax["incr"]
return np.stack((xdata, data))
# ==== Commands ====
[docs]
@base.base_command
def set_timeaxis(self, scale=None, offset=None):
"""Horizontal scale and offset of time axis. Set one or both.
:param float scale: time interval per screen square [s]
:param float offset: offset from screen center [s]
Note the dependency - a change in scale may affect offset value.
"""
if scale is not None:
scale = float(scale) # Can raise ValueError
self.raw_scpi(":TIMEBASE:SCALE {:.6e}".format(scale))
if offset is not None:
offset = float(offset) # Can raise ValueError
self.raw_scpi(":TIMEBASE:OFFSET {:.6e}".format(offset))
[docs]
@base.base_command
def timeaxis(self):
"""Return time axis settings as a tuple (scale, offset), units: s.
"""
scale = float(self.raw_scpi(":TIMEBASE:SCALE?"))
offset = float(self.raw_scpi(":TIMEBASE:OFFSET?"))
return scale, offset
[docs]
@base.base_command
def set_channelaxis(self, scale=None, offset=None):
"""Vertical scale and offset of active channel. Set one or both.
:param float scale: signal magnitude per screen square [V]
:param float offset: offset from screen center [V]
Note the dependency - a change in scale may affect offset value.
"""
# channel = self.active_ch
for channel in self._channel_ids:
if scale is not None:
scale = float(scale) # Can raise ValueError
self.raw_scpi(":CHANNEL{:d}:SCALE {:.6e}".format(channel, scale))
if offset is not None:
offset = float(offset) # Can raise ValueError
self.raw_scpi(":CHANNEL{:d}:OFFSET {:.6e}".format(channel, offset))
[docs]
@base.base_command
def channelaxis(self):
"""Return tuple (scale, offset) for active channel axis [V].
"""
channel = self.active_ch
scale = float(self.raw_scpi(":CHANNEL{:d}:SCALE?".format(channel)))
offset = float(self.raw_scpi(":CHANNEL{:d}:OFFSET?".format(channel)))
return scale, offset
[docs]
@base.base_command
def set_freqaxis(self, low, high):
"""FFT: Horizontal frequency axis of active channel.
:param float low: lowest frequency [Hz]
:param float high: highest frequency [Hz]
"""
prefix = ":MATH{}:FFT".format(self.active_ch)
low = float(low)
high = float(high)
self.raw_scpi(prefix + ":FREQ:START {:.6e}".format(low))
self.raw_scpi(prefix + ":FREQ:END {:.6e}".format(high))
scale = abs(high - low) / 12
center = (low + high) / 2
self.raw_scpi(prefix + ":HSCALE {:.6e}".format(scale))
self.raw_scpi(prefix + ":HCENTER {:.6e}".format(center))
[docs]
@base.base_command
def freqaxis(self):
"""FFT: Return tuple (low, high), frequency range of active channel [Hz].
"""
prefix = ":MATH{}:FFT".format(self.active_ch)
low = float(self.raw_scpi(prefix + ":FREQ:START?"))
high = float(self.raw_scpi(prefix + ":FREQ:END?"))
return low, high
[docs]
@base.base_command
def set_amplitaxis(self, scale=None, offset=None):
"""FFT: Amplitude scale and offset of active channel. Set one or both.
:param float scale: amplitude per screen square [dBV]
:param float offset: amplitude offset from screen center [dBV]
"""
prefix = ":math{}:fft".format(self.active_ch)
if scale is not None:
scale = float(scale)
self.raw_scpi(prefix + ":SCALE {:.6e}".format(scale))
if offset is not None:
offset = float(offset)
self.raw_scpi(prefix + ":OFFSET {:.6e}".format(offset))
[docs]
@base.base_command
def amplitaxis(self):
"""FFT: Return tuple (scale, offset) of active channel amplitude axis [dBV].
"""
prefix = ":MATH{}:FFT".format(self.active_ch)
scale = float(self.raw_scpi(prefix + ":SCALE?"))
offset = float(self.raw_scpi(prefix + ":OFFSET?"))
return scale, offset
[docs]
@base.base_command
def set_fftwindow(self, window):
"""FFT: Set window function for active channel.
:param str window: One of 'RECT', 'BLAC', 'HANN', 'HAMM', 'FLAT', 'TRI'.
"""
W = ["RECT", "BLAC", "HANN", "HAMM", "FLAT", "TRI"]
if window not in W:
raise base.CommandError("Use one of {}".format(W))
self.raw_scpi(":MATH{:d}:FFT:WINDOW {}".format(self.active_ch, window))
[docs]
@base.base_command
def fftwindow(self):
"""FFT: Return window function that is used for active channel.
"""
return self.raw_scpi(":MATH{:d}:FFT:WINDOW?".format(self.active_ch))
[docs]
@base.base_command
def fetch_data(self): # (self, channels=None):
"""Stop oscilloscope and read captured signal-waveform and fft-waveform
from specified channels. Data is stored in properties ``data`` and
``data_fft``.
"""
channels = None
if channels is None or channels == "":
channels = self._channel_ids
elif isinstance(channels, str):
channels = (self._validate_channelid(int(ch)) for ch in channels.split(","))
self.raw_scpi(":STOP")
for ch in channels:
self._last["waveform"][ch] = self._fetch_waveform("channel" + str(ch))
# self._last["waveform_fft"][ch] = self._fetch_waveform("math" + str(ch))
time.sleep(1)
self.raw_scpi(":run")
@property
def data(self):
"""Signal data fetched from active channel (time[s], signal[V]).
"""
return self._last["waveform"][self.active_ch]
@property
def data_fft(self):
"""FFT data fetched from active channel (frequency[Hz], amplitude[dBV]).
"""
return self._last["waveform_fft"][self.active_ch]
[docs]
@base.base_command
def save_data(self): # (self, channels=None):
"""Save last fetched signal-waveforms and fft-waveforms. A list of
channels can be specified, otherwise all channels are used. Returns
tuple of filepaths (signal-waveforms-file, fft-waveforms-file).
:param channels: iterable or comma separated ints in string,
use None to save all availible channels
"""
channels = None
if channels is None or channels == "":
channels = self._channel_ids
elif isinstance(channels, str):
channels = (self._validate_channelid(int(ch)) for ch in channels.split(","))
for ch in channels:
if ch not in self._last["waveform"]: # or ch not in self._last["waveform_fft"]:
raise base.CommandError("No data to save")
self.SAVE_DIR.mkdir(exist_ok=True)
filepath = "{}_{{}}_{}.txt".format(type(self).__name__, time.strftime("%Y-%m-%d_%H-%M-%S"))
filepath = tuple(
self.SAVE_DIR / filepath.replace(" ", "_").replace(":", "-").format(x)
for x in ("channel", "fft")
)
# Reason for pad: When "zoomed out", FFT waveforms may consist of
# less than 1200 points. Pad them to 1200 before concat.
def pad_concat(arrays, size):
return np.concatenate([
np.pad(a, ((0, 0), (0, size - a.shape[1])), constant_values=np.nan)
for a in arrays
])
x = np.concatenate([self._last["waveform"][ch] for ch in channels])
h = " | ".join(["Ch{} time[s] signal[V]".format(ch) for ch in channels])
np.savetxt(filepath[0], x.T, fmt="%.6e", header=h)
self.log.info("Saved signal-waveforms to '{}'".format(filepath[0]))
# x = pad_concat((self._last["waveform_fft"][ch] for ch in channels), 1200)
# h = " | ".join(["Ch{} freq[Hz] amplitude[dBV]".format(ch) for ch in channels])
# np.savetxt(filepath[1], x.T, fmt="%.6e", header=h)
# self.log.info("Saved fft-waveforms to '{}'".format(filepath[1]))
return filepath[0] # tuple(str(f) for f in filepath)
# ==== Methods/commands related to DeviceClient ====
@property
def active_ch(self):
"""Channel displayed at frontend."""
return self._activechannel
@active_ch.setter
def active_ch(self, channel):
"""Set which channel is displayed at frontend.
:param int channel: Channel id number.
"""
self._activechannel = self._validate_channelid(channel)
# @base.idle_command
# def _idleupdate_waveform(self, ch):
# self.waveform(ch)
# @base.idle_command
# def _idleupdate_waveform_fft(self, ch):
# self.waveform_fft(ch)
[docs]
def update_frontend(self, device_client):
# NOTE The oscilloscope (DS1074Z tested, MSO5000 uknown) does not
# seem to be capable of real time data streaming. It is not
# uncommon for queries such as ":wav:data?" to consume 100ms-1s.
# It seems to be caued by the oscilloscope itself.
for ch in self._channel_ids:
self.active_ch = ch
timedata, volts = self.data
device_client.emit("graph", {
"title": "Channel {}".format(self.active_ch),
"x": timedata.tolist(),
"y": volts.tolist(),
"xlabel": "t [s]",
"ylabel": "U [V]",
"id": "graph_channel{}".format(ch)
})
# hertz, amplitude = self.data_fft
# device_client.emit("graph", {
# "title": "Channel {} FFT".format(self.active_ch),
# "x": hertz.tolist(),
# "y": amplitude.tolist(),
# "xlabel": "f [Hz]",
# "ylabel": "U [dBV]",
# "id": "graph_fft"
# })