import serial
import time
import pathlib
import struct
import json
import numpy as np
from . import base
SAMPLE_DT = 0.005 # Sampling dt for precalculated virtual waveform in memory
RUN_DT = 0.05 # Time between set_current() commands, i.e. the real dt
[docs]
class PowerSupply_Magnets(base.HardwareBase):
"""Power supply for experiment with magnetic field probes.
TWINTEX, Programmable/Switching Mode DC Power Supply, PCL series
Communication:
- Serial bus (USB).
"""
def __init__(self, bus, model_name, **kwargs):
"""Constructor
:param str bus: name of serial port, e.g. '/dev/ttyUSB0'
:param str model_name: e.g. 'PCL2400-1H', check info label on the device
"""
super().__init__(**kwargs)
self._bus = None
self._busname = bus
self._model_name = model_name # PCL<max_power>-<max_voltage>
self._max = self._parse_model_name(model_name)
self._last = {
"voltage": 0,
"current": 0,
}
# Waveform templet and waveform itself, both arrays [[t0, I0], ...]
self._wf_user = np.zeros((2, 0))
self._wf_sampled = np.zeros((2, 0))
self._dir_saved_data = pathlib.Path.home() / "saved_data"
@staticmethod
def _parse_model_name(name):
if name[:3] != "PCL":
raise ValueError("Not a PCL-series model name: {}".format(name))
try:
power, volts = name[3:].split("-")
volts = 100 * float(volts[:-1]) if volts[-1] == "H" else float(volts)
return {"voltage": volts, "current": float(power) / volts}
except (ValueError, IndexError):
raise ValueError("Not a PCL-series model name: {}".format(name))
@staticmethod
def _append_crc(byte_msg):
"""Calculate CRC-16/MODBUS and return message+CRC.
:param bytes byte_msg: the message as bytes or bytearray
"""
crc = 0xFFFF
for b in byte_msg:
crc = (crc >> 8) ^ _CRC_TABLE[(crc ^ b) & 0xFF]
return byte_msg + struct.pack("<H", crc)
@base.base_command
def _send(self, command, floatdata=None):
time.sleep(0.05) # Wait a bit, just to make sure previous command is done TODO Necessary?
byte_msg = _MESSAGE[command]
if floatdata is not None:
byte_msg += struct.pack(">f", float(floatdata))
self._bus.write(self._append_crc(byte_msg))
byte_resp = self._bus.read(6 + 2) # TODO Read one by one and check if resp not error?
if len(byte_resp) != 8: # Try again, helps when waiting for first message
byte_resp += self._bus.read(8 - len(byte_resp))
if len(byte_resp) == 0:
raise base.TransmissionError("No response from device")
if byte_resp != self._append_crc(byte_resp[:-2]):
raise base.TransmissionError("Response has corrupted CRC")
elif byte_resp[:6] != _MESSAGE[command][:6]:
raise base.DeviceError("Negative response - something went wrong")
time.sleep(0.05) # TODO Necessary?
@base.base_command
def _query(self, command):
self._bus.write(self._append_crc(_MESSAGE_QUERY[command]))
byte_resp = self._bus.read(3 + 4 + 2)
if byte_resp != self._append_crc(byte_resp[:-2]):
raise base.TransmissionError("Response has corrupted CRC")
# TODO Check if reponse is not error?
return struct.unpack(">f", byte_resp[3: 7])[0]
# ==== Inherited abstract methods ====
def _connect(self):
try:
self._bus = serial.Serial(self._busname, 9600, timeout=1)
except serial.SerialException:
self.log.debug("Failed to open serial '%s'", self._busname)
return False
# Set remote control
try:
self._send("set_remote")
except base.TransmissionError as e:
self.log.debug(str(e))
return False
return True
def _disconnect(self):
self._bus.close()
self._bus = None
return
def _is_ready(self):
return True
def _safestate(self):
# TODO Resolve what actually works (seems that setting max values
# to 0 does not work & it is not useful & may be annoying)
self.set_current(0)
# self.set_voltage(0)
# self.set_maxcurrent(0)
# self.set_maxvoltage(0)
# ==== Commands ====
@property
def model_name(self):
return self._model_name
@property
def model_voltage(self):
return self._max["voltage"]
@property
def model_current(self):
return self._max["current"]
@property
def waveform(self):
return self._wf_user
[docs]
@base.base_command
def voltage(self):
"""Return output voltage [V]."""
self._last["voltage"] = self._query("read_voltage")
return self._last["voltage"]
[docs]
@base.base_command
def set_voltage(self, voltage):
"""Set voltage and apply [V]."""
self._send("set_voltage", float(voltage))
self._send("apply_voltage")
[docs]
@base.base_command
def set_maxvoltage(self, voltage):
"""Set maximal voltage [V]."""
self._send("set_voltage_max", float(voltage))
# self._send("apply_voltage") # TODO Apply?
[docs]
@base.base_command
def current(self):
"""Return output current [A]."""
self._last["current"] = self._query("read_current")
return self._last["current"]
[docs]
@base.base_command
def set_current(self, current):
"""Set current and apply [A]."""
self._send("set_current", float(current))
self._send("apply_current")
[docs]
@base.base_command
def set_maxcurrent(self, current):
"""Set maximal current [A]."""
self._send("set_current_max", float(current))
# NOTE This method must be base command (even though it does not use _bus)
# in order to prevent modifying the waveform when run_waveform() is running.
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idle_update_all(self):
# Call to each of these methods updates self._last
self.voltage()
self.current()
[docs]
def update_frontend(self, device_client):
self._idle_update_all()
for var, unit in zip(["voltage", "current"], ["V", "A"]):
value = self._last[var]
device_client.emit("value", {
"value": value,
"formatted": "{:.0f} {}".format(value, unit),
"label": var.capitalize(),
"min": 0,
"max": self._max[var],
"id": var
})
_CRC_TABLE = ( # Table for CRC-16/MODBUS
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040,
)
_MESSAGE = {
# Format: 0x01 | <2B ?> | <2B register addr.> | data/apply | data len | <nB data>
"set_remote": bytes([0x01, 0x05, 0x05, 0x00, 0xff, 0x00]),
"set_voltage": bytes([0x01, 0x10, 0x0a, 0x05, 0x00, 0x02] + [0x04]),
"set_voltage_max": bytes([0x01, 0x10, 0x0a, 0x01, 0x00, 0x02] + [0x04]),
"apply_voltage": bytes([0x01, 0x10, 0x0a, 0x00, 0x00, 0x01] + [0x02, 0x00, 0x01]),
"set_current": bytes([0x01, 0x10, 0x0a, 0x07, 0x00, 0x02] + [0x04]),
"set_current_max": bytes([0x01, 0x10, 0x0a, 0x03, 0x00, 0x02] + [0x04]),
"apply_current": bytes([0x01, 0x10, 0x0a, 0x00, 0x00, 0x01] + [0x02, 0x00, 0x02]),
}
_MESSAGE_QUERY = {
"read_voltage": bytes([0x01, 0x03, 0x0b, 0x00, 0x00, 0x02]),
"read_current": bytes([0x01, 0x03, 0x0b, 0x02, 0x00, 0x02]),
}