Source code for device.power_supply_magnets

import serial

import time
import pathlib
import struct
import json
import numpy as np

from . import base


SAMPLE_DT = 0.005  # Sampling dt for precalculated virtual waveform in memory
RUN_DT = 0.05      # Time between set_current() commands, i.e. the real dt


[docs] class PowerSupply_Magnets(base.HardwareBase): """Power supply for experiment with magnetic field probes. TWINTEX, Programmable/Switching Mode DC Power Supply, PCL series Communication: - Serial bus (USB). """ def __init__(self, bus, model_name, **kwargs): """Constructor :param str bus: name of serial port, e.g. '/dev/ttyUSB0' :param str model_name: e.g. 'PCL2400-1H', check info label on the device """ super().__init__(**kwargs) self._bus = None self._busname = bus self._model_name = model_name # PCL<max_power>-<max_voltage> self._max = self._parse_model_name(model_name) self._last = { "voltage": 0, "current": 0, } # Waveform templet and waveform itself, both arrays [[t0, I0], ...] self._wf_user = np.zeros((2, 0)) self._wf_sampled = np.zeros((2, 0)) self._dir_saved_data = pathlib.Path.home() / "saved_data" @staticmethod def _parse_model_name(name): if name[:3] != "PCL": raise ValueError("Not a PCL-series model name: {}".format(name)) try: power, volts = name[3:].split("-") volts = 100 * float(volts[:-1]) if volts[-1] == "H" else float(volts) return {"voltage": volts, "current": float(power) / volts} except (ValueError, IndexError): raise ValueError("Not a PCL-series model name: {}".format(name)) @staticmethod def _append_crc(byte_msg): """Calculate CRC-16/MODBUS and return message+CRC. :param bytes byte_msg: the message as bytes or bytearray """ crc = 0xFFFF for b in byte_msg: crc = (crc >> 8) ^ _CRC_TABLE[(crc ^ b) & 0xFF] return byte_msg + struct.pack("<H", crc) @base.base_command def _send(self, command, floatdata=None): time.sleep(0.05) # Wait a bit, just to make sure previous command is done TODO Necessary? byte_msg = _MESSAGE[command] if floatdata is not None: byte_msg += struct.pack(">f", float(floatdata)) self._bus.write(self._append_crc(byte_msg)) byte_resp = self._bus.read(6 + 2) # TODO Read one by one and check if resp not error? if len(byte_resp) != 8: # Try again, helps when waiting for first message byte_resp += self._bus.read(8 - len(byte_resp)) if len(byte_resp) == 0: raise base.TransmissionError("No response from device") if byte_resp != self._append_crc(byte_resp[:-2]): raise base.TransmissionError("Response has corrupted CRC") elif byte_resp[:6] != _MESSAGE[command][:6]: raise base.DeviceError("Negative response - something went wrong") time.sleep(0.05) # TODO Necessary? @base.base_command def _query(self, command): self._bus.write(self._append_crc(_MESSAGE_QUERY[command])) byte_resp = self._bus.read(3 + 4 + 2) if byte_resp != self._append_crc(byte_resp[:-2]): raise base.TransmissionError("Response has corrupted CRC") # TODO Check if reponse is not error? return struct.unpack(">f", byte_resp[3: 7])[0] # ==== Inherited abstract methods ==== def _connect(self): try: self._bus = serial.Serial(self._busname, 9600, timeout=1) except serial.SerialException: self.log.debug("Failed to open serial '%s'", self._busname) return False # Set remote control try: self._send("set_remote") except base.TransmissionError as e: self.log.debug(str(e)) return False return True def _disconnect(self): self._bus.close() self._bus = None return def _is_ready(self): return True def _safestate(self): # TODO Resolve what actually works (seems that setting max values # to 0 does not work & it is not useful & may be annoying) self.set_current(0) # self.set_voltage(0) # self.set_maxcurrent(0) # self.set_maxvoltage(0) # ==== Commands ==== @property def model_name(self): return self._model_name @property def model_voltage(self): return self._max["voltage"] @property def model_current(self): return self._max["current"] @property def waveform(self): return self._wf_user
[docs] @base.base_command def voltage(self): """Return output voltage [V].""" self._last["voltage"] = self._query("read_voltage") return self._last["voltage"]
[docs] @base.base_command def set_voltage(self, voltage): """Set voltage and apply [V].""" self._send("set_voltage", float(voltage)) self._send("apply_voltage")
[docs] @base.base_command def set_maxvoltage(self, voltage): """Set maximal voltage [V].""" self._send("set_voltage_max", float(voltage))
# self._send("apply_voltage") # TODO Apply?
[docs] @base.base_command def current(self): """Return output current [A].""" self._last["current"] = self._query("read_current") return self._last["current"]
[docs] @base.base_command def set_current(self, current): """Set current and apply [A].""" self._send("set_current", float(current)) self._send("apply_current")
[docs] @base.base_command def set_maxcurrent(self, current): """Set maximal current [A].""" self._send("set_current_max", float(current))
# NOTE This method must be base command (even though it does not use _bus) # in order to prevent modifying the waveform when run_waveform() is running.
[docs] @base.base_command def set_waveform(self, device_client, data): """Set waveform I(t), use format [[t0, I0], [t1, I1], ...].""" if isinstance(data, str): try: data = json.loads(data) except json.decoder.JSONDecodeError: raise base.CommandError("Use format [[t0, I0], [t1, I1], ...]") try: data = np.array(data, dtype=float) except (TypeError, ValueError): raise base.CommandError("Use format [[t0, I0], [t1, I1], ...]") if len(data.shape) != 2 or 2 not in data.shape: raise base.CommandError("Use format [[t0, I0], [t1, I1], ...]") data = data if data.shape[0] == 2 else data.T if data.shape[1] < 2: raise base.CommandError("Waveform must have at least 2 points") # Set user-provided waveform templet, sort by time self._wf_user = data[:, np.argsort(data[0])] # Line interpolate and sample with constant dt def interp(arr, t): k = (t - arr[0, 0]) / (arr[0, 1] - arr[0, 0]) return arr[1, 0] + k * (arr[1, 1] - arr[1, 0]) samples = int(np.ceil(max(self._wf_user[0, -1], 0) / SAMPLE_DT)) + 1 self._wf_sampled = np.stack([ np.arange(samples, dtype=float) * SAMPLE_DT, np.zeros(samples, dtype=float) ]) # Interpolation: iterating over samples and also over neighbouring pairs # in user templet, sample value = interpolation of pair def fill_in_samples(wf_sampled): j = 0 for i, t in enumerate(wf_sampled[0]): if t < self._wf_user[0, 0]: continue while t > self._wf_user[0, j+1]: j += 1 if j + 1 >= self._wf_user.shape[1]: return wf_sampled[1, i] = interp(self._wf_user[:, j:j+2], t) fill_in_samples(self._wf_sampled) if device_client is not None: device_client.emit("graph", { "title": "Waveform", "x": self._wf_user[0].tolist(), "y": self._wf_user[1].tolist(), "xlabel": "t [s]", "ylabel": "I [A]", "id": "graph_user_waveform" }) self.log.info("New waveform I(t) set ({} points)".format(data.shape[1])) if np.min(self._wf_user[0, 1:] - self._wf_user[0, :-1]) <= 2 * RUN_DT: self.log.warn("Encountered points closer than 2*dt_per_sample" " = {:.0f} ms".format(RUN_DT * 1e3))
[docs] @base.compound_command def run_waveform(self, device_client): """Execute the current waveform I(t).""" if self._wf_sampled.shape[1] == 0: raise base.CommandError("No waveform data were set") self.log.info("Waveform I(t) started") current_output = [] #TODO Check whether time.time() is ok, or if perhaps time.perf_counter() would be better t0 = time.time() t = 0 while t < self._wf_sampled[0, -1]: current_output.append([t, self.current()]) self.set_current(self._wf_sampled[1, round(t / SAMPLE_DT)]) t = time.time() - t0 self.set_current(0) current_output.append([time.time()-t0, self.current()]) current_output = np.array(current_output, dtype=float) header = "timestamp {:.3f}\ntime[s], current[A]".format(t0) filepath = "magnets_current_{}_{{}}.txt".format(time.strftime("%Y-%m-%d_%H-%M")) suffix = 1 while (self._dir_saved_data / filepath.format(suffix)).exists(): suffix += 1 filepath = self._dir_saved_data / filepath.format(suffix) np.savetxt(str(filepath), current_output, fmt="%.3f", header=header) if device_client is not None: device_client.emit("graph", { "title": "Measured output", "x": current_output[:, 0].tolist(), "y": current_output[:, 1].tolist(), "xlabel": "t [s]", "ylabel": "I [A]", "id": "graph_measured_output" }) with filepath.open("r") as file: device_client.emit_datafile(file, filepath.name) self.log.info("Waveform I(t) finished")
# ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idle_update_all(self): # Call to each of these methods updates self._last self.voltage() self.current()
[docs] def update_frontend(self, device_client): self._idle_update_all() for var, unit in zip(["voltage", "current"], ["V", "A"]): value = self._last[var] device_client.emit("value", { "value": value, "formatted": "{:.0f} {}".format(value, unit), "label": var.capitalize(), "min": 0, "max": self._max[var], "id": var })
_CRC_TABLE = ( # Table for CRC-16/MODBUS 0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241, 0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440, 0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40, 0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841, 0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40, 0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41, 0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641, 0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040, 0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240, 0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441, 0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41, 0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840, 0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41, 0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40, 0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640, 0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041, 0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240, 0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441, 0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41, 0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840, 0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41, 0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40, 0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640, 0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041, 0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241, 0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440, 0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40, 0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841, 0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40, 0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41, 0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641, 0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040, ) _MESSAGE = { # Format: 0x01 | <2B ?> | <2B register addr.> | data/apply | data len | <nB data> "set_remote": bytes([0x01, 0x05, 0x05, 0x00, 0xff, 0x00]), "set_voltage": bytes([0x01, 0x10, 0x0a, 0x05, 0x00, 0x02] + [0x04]), "set_voltage_max": bytes([0x01, 0x10, 0x0a, 0x01, 0x00, 0x02] + [0x04]), "apply_voltage": bytes([0x01, 0x10, 0x0a, 0x00, 0x00, 0x01] + [0x02, 0x00, 0x01]), "set_current": bytes([0x01, 0x10, 0x0a, 0x07, 0x00, 0x02] + [0x04]), "set_current_max": bytes([0x01, 0x10, 0x0a, 0x03, 0x00, 0x02] + [0x04]), "apply_current": bytes([0x01, 0x10, 0x0a, 0x00, 0x00, 0x01] + [0x02, 0x00, 0x02]), } _MESSAGE_QUERY = { "read_voltage": bytes([0x01, 0x03, 0x0b, 0x00, 0x00, 0x02]), "read_current": bytes([0x01, 0x03, 0x0b, 0x02, 0x00, 0x02]), }