Source code for device.base.visabase
import pyvisa
import pyvisa_py
import serial
import socket # Only beacause of socket.timeout exception
from . import base
# NOTE Debugging SCPI commands - Incorrect 'write' command passed to raw_scpi()
# usually triggers an error on the device (error can be then queried using
# e.g. :SYSTEM:ERROR?). However, query commands break the whole transmission
# because of a bug/bad implementation of pyvisa-py. The exceptions and log
# entries might be rather cryptic in this case.
[docs]
class VisaHardwareBase(base.HardwareBase):
"""Base class for communication with hardware.
Communication is based on VISA standard and SCPI commands. Derived classes
can implement convenient functions for user friendly and/or automatized
control of the device.
Required method overload:
__init__ -> call super().__init__ with your VISA 'resource_name'
Optional method overloads:
def to_defaultstate(self):
def clear_defaultstate(self):
"""
def __init__(self, resource_name, timeout=2000, baud_rate=9600, **kwargs):
"""Constructor.
:param str resource_name: VISA resource, can be found in device manual
:param int timeout: timeout for all I/O operations [ms]
Note: When debugging SCPI it may be useful to use longer timeout values.
Otherwise invalid queries may break the connection and require
you to disconnect & connect again (probably a bug in pyvisa-py).
"""
super().__init__(**kwargs)
self._resource_manager = pyvisa.ResourceManager()
self._resource_name = resource_name
self._device = None
self._timeout = timeout
self._baud_rate = baud_rate
def _connect(self):
"""Connect procedure using 'resource_name'."""
# No connection: Try to connect, i.e. open visa resource
self.log.debug("Trying VISA connection " + self._resource_name)
try:
self._device = self._resource_manager.open_resource(self._resource_name)
self._device.timeout = self._timeout
self._device.baud_rate = self._baud_rate
# Failed to connect
except (pyvisa.errors.VisaIOError, serial.SerialException):
self._device = None
return False
try:
self.log.info(self.raw_scpi("*IDN?").strip())
return True
except base.TransmissionError:
self._device.close()
self._device = None
return False
# Catching everything (see _disconnect).
# except Exception as e:
# self.log.error("VISA connect: An unexpected exception was raised")
# self.log.debug("Exception "+str(type(e))+" "+str(e))
# self._device = None
# return False
def _disconnect(self):
"""Disconnect procedure."""
try:
self._device.close()
# Catching everything because disconnect should not fail. Also there
# does not seem to be a specific exception from pyvisa when for example
# calling close() on failed connection.
except Exception as e:
self.log.error("VISA disconnect: An unexpected exception was raised")
self.log.debug("Exception "+str(type(e))+" "+str(e))
finally:
self._device = None
def _is_ready(self):
"""Connection check procedure."""
try:
self.raw_scpi("*IDN?")
return True
except base.TransmissionError:
return False
[docs]
@base.base_command
def raw_scpi(self, command, query_parse="none", parse_datatype="f", **kwargs):
"""Send SCPI command to device. Commands containing '?' are
automatically processed as queries - waiting for device response.
:param str command: SCPI command.
:param str query_parse: use 'none', 'ascii' or 'binary'
:param str parse_datatype: specify datatype of parsed values using
Format Characters from native module ``struct``.
:param dict kwargs: additional parameters for query_ascii_values() or
query_binary_values() from pyvisa.Resource
"""
if not isinstance(command, str):
raise TypeError("Expected str got {} instead".format(type(command)))
if ";" in command:
raise ValueError("Concatenating commands is forbidden (security - command injection)")
if query_parse not in ["none", "ascii", "binary"]:
raise ValueError("Expected query_parse='none', 'ascii' or 'binary'")
if self._status["debug_spam"]:
self.log.debug("SCPI command '{}'".format(command))
try:
if "?" in command:
if query_parse == "none":
return self._device.query(command)
elif query_parse == "ascii":
return self._device.query_ascii_values(
command, parse_datatype, **kwargs
)
elif query_parse == "binary":
# NOTE: Maybe should specify 'data_points' (length of data)
# when using binary, pyvisa-py throws warning sometimes
return self._device.query_binary_values(
command, parse_datatype, **kwargs
)
else:
self._device.write(command)
except pyvisa.errors.VisaIOError as e:
if e.abbreviation == "VI_ERROR_TMO":
self.log.debug("Possible cause 1: Syntax error in SCPI query command")
self.log.debug("Possible cause 2: Too many SCPI commands sent at once,"
" interlace with queries (e.g. *idn?) to wait for"
" the device to catch up or increase _device.timeout")
elif e.abbreviation == "VI_ERROR_IO":
self.log.debug("Possible cause 1: Cable disconnected or device turned off")
self.log.debug("Possible cause 2: Too short _device.timeout")
raise base.TransmissionError(str(e))
except socket.timeout:
raise base.TransmissionError("Device not responding")
except pyvisa_py.protocols.rpc.RPCError:
raise base.TransmissionError("Broken connection (xid mismatch)")
[docs]
class TestVisaHardwareBase(VisaHardwareBase):
"""Mockup class to simulate communication with VISA device. Use for testing
and dry-run of subclasses that are based on VisaHardwareBase.
"""
def __init__(self, resource_name, **kwargs):
super().__init__(**kwargs)
self.log.debug("Test -> {}".format(resource_name))
def _connect(self):
self.log.debug("Test -> _connect")
return True
def _disconnect(self):
self.log.debug("Test -> _disconnect")
return
def _is_ready(self):
self.log.debug("Test -> _is_ready")
return True
[docs]
@base.base_command
def raw_scpi(self, command, query_parse="none"):
if not isinstance(command, str):
raise TypeError("Expected str got {} instead".format(type(command)))
self.log.debug("Test -> SCPI command '{}'".format(command))
if query_parse not in ["none", "ascii", "binary"]:
raise ValueError("Expected query_parse='none', 'ascii' or 'binary'")
if command[-1] == "?":
return 0