Source code for device.base.visabase

import pyvisa
import pyvisa_py
import serial

import socket  # Only beacause of socket.timeout exception

from . import base


# NOTE Debugging SCPI commands - Incorrect 'write' command passed to raw_scpi()
#      usually triggers an error on the device (error can be then queried using
#      e.g. :SYSTEM:ERROR?). However, query commands break the whole transmission
#      because of a bug/bad implementation of pyvisa-py. The exceptions and log
#      entries might be rather cryptic in this case.

[docs] class VisaHardwareBase(base.HardwareBase): """Base class for communication with hardware. Communication is based on VISA standard and SCPI commands. Derived classes can implement convenient functions for user friendly and/or automatized control of the device. Required method overload: __init__ -> call super().__init__ with your VISA 'resource_name' Optional method overloads: def to_defaultstate(self): def clear_defaultstate(self): """ def __init__(self, resource_name, timeout=2000, baud_rate=9600, **kwargs): """Constructor. :param str resource_name: VISA resource, can be found in device manual :param int timeout: timeout for all I/O operations [ms] Note: When debugging SCPI it may be useful to use longer timeout values. Otherwise invalid queries may break the connection and require you to disconnect & connect again (probably a bug in pyvisa-py). """ super().__init__(**kwargs) self._resource_manager = pyvisa.ResourceManager() self._resource_name = resource_name self._device = None self._timeout = timeout self._baud_rate = baud_rate def _connect(self): """Connect procedure using 'resource_name'.""" # No connection: Try to connect, i.e. open visa resource self.log.debug("Trying VISA connection " + self._resource_name) try: self._device = self._resource_manager.open_resource(self._resource_name) self._device.timeout = self._timeout self._device.baud_rate = self._baud_rate # Failed to connect except (pyvisa.errors.VisaIOError, serial.SerialException): self._device = None return False try: self.log.info(self.raw_scpi("*IDN?").strip()) return True except base.TransmissionError: self._device.close() self._device = None return False # Catching everything (see _disconnect). # except Exception as e: # self.log.error("VISA connect: An unexpected exception was raised") # self.log.debug("Exception "+str(type(e))+" "+str(e)) # self._device = None # return False def _disconnect(self): """Disconnect procedure.""" try: self._device.close() # Catching everything because disconnect should not fail. Also there # does not seem to be a specific exception from pyvisa when for example # calling close() on failed connection. except Exception as e: self.log.error("VISA disconnect: An unexpected exception was raised") self.log.debug("Exception "+str(type(e))+" "+str(e)) finally: self._device = None def _is_ready(self): """Connection check procedure.""" try: self.raw_scpi("*IDN?") return True except base.TransmissionError: return False
[docs] @base.base_command def raw_scpi(self, command, query_parse="none", parse_datatype="f", **kwargs): """Send SCPI command to device. Commands containing '?' are automatically processed as queries - waiting for device response. :param str command: SCPI command. :param str query_parse: use 'none', 'ascii' or 'binary' :param str parse_datatype: specify datatype of parsed values using Format Characters from native module ``struct``. :param dict kwargs: additional parameters for query_ascii_values() or query_binary_values() from pyvisa.Resource """ if not isinstance(command, str): raise TypeError("Expected str got {} instead".format(type(command))) if ";" in command: raise ValueError("Concatenating commands is forbidden (security - command injection)") if query_parse not in ["none", "ascii", "binary"]: raise ValueError("Expected query_parse='none', 'ascii' or 'binary'") if self._status["debug_spam"]: self.log.debug("SCPI command '{}'".format(command)) try: if "?" in command: if query_parse == "none": return self._device.query(command) elif query_parse == "ascii": return self._device.query_ascii_values( command, parse_datatype, **kwargs ) elif query_parse == "binary": # NOTE: Maybe should specify 'data_points' (length of data) # when using binary, pyvisa-py throws warning sometimes return self._device.query_binary_values( command, parse_datatype, **kwargs ) else: self._device.write(command) except pyvisa.errors.VisaIOError as e: if e.abbreviation == "VI_ERROR_TMO": self.log.debug("Possible cause 1: Syntax error in SCPI query command") self.log.debug("Possible cause 2: Too many SCPI commands sent at once," " interlace with queries (e.g. *idn?) to wait for" " the device to catch up or increase _device.timeout") elif e.abbreviation == "VI_ERROR_IO": self.log.debug("Possible cause 1: Cable disconnected or device turned off") self.log.debug("Possible cause 2: Too short _device.timeout") raise base.TransmissionError(str(e)) except socket.timeout: raise base.TransmissionError("Device not responding") except pyvisa_py.protocols.rpc.RPCError: raise base.TransmissionError("Broken connection (xid mismatch)")
[docs] class TestVisaHardwareBase(VisaHardwareBase): """Mockup class to simulate communication with VISA device. Use for testing and dry-run of subclasses that are based on VisaHardwareBase. """ def __init__(self, resource_name, **kwargs): super().__init__(**kwargs) self.log.debug("Test -> {}".format(resource_name)) def _connect(self): self.log.debug("Test -> _connect") return True def _disconnect(self): self.log.debug("Test -> _disconnect") return def _is_ready(self): self.log.debug("Test -> _is_ready") return True
[docs] @base.base_command def raw_scpi(self, command, query_parse="none"): if not isinstance(command, str): raise TypeError("Expected str got {} instead".format(type(command))) self.log.debug("Test -> SCPI command '{}'".format(command)) if query_parse not in ["none", "ascii", "binary"]: raise ValueError("Expected query_parse='none', 'ascii' or 'binary'") if command[-1] == "?": return 0