Source code for device.resonance_controller

import serial

import time
import glob
import re

from . import base


[docs] class ResonanceController(base.HardwareBase): """Arduino based multipurpose controller (resonance experiment). Communication: - serial interface to Arduino """ POWER_LIMS = (0, 80) # RF generator output power limits [W] def __init__(self, bus=None, **kwargs): """Constructor :param str bus: (optional) specify the name of serial bus where Arduino is connected, e.g. ``/dev/ttyUSB0`` """ super().__init__(**kwargs) # Automatically search for USB serial device if none were specified if bus is None: buses = glob.glob("/dev/ttyUSB*") if len(buses) != 1: msg = "Expected 1 usb serial port, got {} instead" raise RuntimeError(msg.format(len(buses))) self._bus = buses[0] else: self._bus = bus self._ardu = None self._last = {} self._progress = 0 # ==== Inherited abstract methods ==== def _connect(self): try: self._ardu = serial.Serial(self._bus, 9600) except serial.SerialException: self.log("Failed to open serial '%s'", self._bus) return False # Reset self._ardu.setDTR(False) time.sleep(0.1) self._ardu.reset_input_buffer() self._ardu.setDTR(True) self._ardu.readline() # Initial message "START\r\n" self._read_current_states() return True def _disconnect(self): # TODO Close everything self.output_off() # self.close_valve() # self.set_device_relays(False, False) self._ardu.close() self._ardu = None def _is_ready(self): return True # TODO Maybe check something? Test with cable disconnect def _safestate(self): self.output_off() # self.close_valve() # TODO Should the valve be closing? # ==== Private commands ==== @base.base_command def _readresp(self): return self._ardu.readline().strip().decode("ascii") @base.base_command def _writecmd(self, cmd): self._ardu.write(cmd.encode("ascii")) @base.base_command def _writecmd_toggle(self, cmd, key_in_last): self._writecmd(cmd) self._last[key_in_last] = not self._last[key_in_last] @base.base_command def _read_current_states(self): self._writecmd("i") # Ignore any left-over lines, then parse power (actual and reflected) # # "0.00 V na P_FRW. 0.00 W do zateze." # "0.00 V na P_REV. 0.00 W odrazeny." line = "" while "P_FRW" not in line: line = self._readresp() parsed = re.search(r"P_FRW\. ([0-9]+\.[0-9]+) W", line) self._last["rf_power_actual"] = float(parsed.group(1)) parsed = re.search(r"P_REV\. ([0-9]+\.[0-9]+) W", self._readresp()) self._last["rf_power_reflected"] = float(parsed.group(1)) # Parse various states # # "Limit state 0" (not used) # "Output state 1" # "Output set 0" (not used) # "Relay state 1" - relay for RF generator # "Ventil state 0" self._readresp() self._last["rf_output"] = self._readresp()[-1] == "1" self._readresp() self._last["rf_on"] = self._readresp()[-1] == "1" self._last["valve_open"] = self._readresp()[-1] == "1" self._last["sa_on"] = False # self._readresp()[-1] == "1" # TODO Is this implemented? (second relay) def _validate_power(self, power): power = float(power) if self.POWER_LIMS[0] <= power <= self.POWER_LIMS[1]: return power else: msg = "Power value out of bounds ({:.2f}-{:.2f} W)" raise base.CommandError(msg.format(*self.POWER_LIMS)) # ==== Commands ====
[docs] @base.base_command def set_device_relays(self, rf_gen, spect_an): """Enable or disable power to devices. :param bool rf_gen: RF generator on/off :param bool spect_an: spectral analyser on/off """ def parse(s): textvals = { "on": True, "true": True, "1": True, "off": False, "false": False, "0": False, } if isinstance(s, bool): return s elif isinstance(s, str) and s in textvals: return textvals[s] else: raise base.CommandError("Please use on/off, 1/0, true/false") if parse(rf_gen): self.switch_generator_on() else: self.switch_generator_off() if parse(spect_an): self.switch_spectral_on() else: self.switch_spectral_on()
[docs] @base.base_command def is_generator_on(self): """Return the state of relay that powers output generator.""" self._read_current_states() return self._last["rf_on"]
[docs] @base.base_command def switch_generator_on(self): if not self.is_generator_on(): self._writecmd("r")
[docs] @base.base_command def switch_generator_off(self): if self.is_generator_on(): self._writecmd("r")
[docs] @base.base_command def is_spectral_on(self): """Return the state of relay that powers spectral analyser.""" self._read_current_states() return self._last["sa_on"] # TODO Is this already implemented in arduino?
[docs] @base.base_command def switch_spectral_on(self): if not self.is_spectral_on(): self._writecmd("a") # TODO Is this already implemented in arduino?
[docs] @base.base_command def switch_spectral_off(self): if self.is_spectral_on(): self._writecmd("a") # TODO Is this already implemented in arduino?
[docs] @base.base_command def is_valve_open(self): """Return the state of the valve.""" self._read_current_states() return self._last["valve_open"]
[docs] @base.base_command def open_valve(self): """Open valve.""" if not self.is_valve_open(): self._writecmd_toggle("v", "valve_open")
[docs] @base.base_command def close_valve(self): """Close valve.""" if self.is_valve_open(): self._writecmd_toggle("v", "valve_open")
[docs] @base.base_command def is_output_on(self): """Return the state of RF generator output.""" self._read_current_states() return self._last["rf_output"]
[docs] @base.base_command def output_on(self, power): """Enable RF generator output. :param float power: output power [W] """ power = self._validate_power(power) self._writecmd("w{:.0f}".format(power)) # TODO Should the generator be disabled first before setting power? self._readresp() # Response "Set watts 70.00 equals Volts 3.50" # self._last["rf_power_set"] = power if not self.is_output_on(): self._writecmd_toggle("o", "rf_output")
[docs] @base.compound_command def output_on_for(self, power, duration): """Enable RF generator output for a limited amount of time. :param float power: output power [W] :param float duration: time interval [s] """ duration = float(duration) power = self._validate_power(power) self.log.info("Power on {:.1f}W for {:.1f}s".format(power, duration)) self.output_on(power) t0 = time.time() while(time.time() - t0 < duration): self._read_current_states() self._progress = (time.time() - t0) / duration time.sleep(0.1) self._progress = 1 self.output_off()
[docs] @base.base_command def output_off(self): """Disable RF generator output.""" if self.is_output_on(): self._writecmd_toggle("o", "rf_output") self._writecmd("w0") # Power -> 0 W
# self._last["rf_power_set"] = 0.0 # ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idle_update_states(self): self._read_current_states()
[docs] def update_frontend(self, device_client): self._idle_update_states() value = self._last["rf_power_actual"] device_client.emit("value", { "value": value, "formatted": "{:.2f} W".format(value), "label": "Output P", "min": self.POWER_LIMS[0], "max": self.POWER_LIMS[1], "id": "rf_power_actual" }) value = self._last["rf_power_reflected"] device_client.emit("value", { "value": value, "formatted": "{:.2f} W".format(value), "label": "Reflected P", "min": self.POWER_LIMS[0], "max": self.POWER_LIMS[1], "id": "rf_power_reflected" }) device_client.emit("value", { "value": self._progress, "formatted": "", "label": "Task progress", "min": 0, "max": 1, "id": "state_progress" }) state = self._last["valve_open"] device_client.emit("value", { "value": 1 if state else 0, "formatted": "open" if state else "closed", "label": "Valve", "min": 0, "max": 1, "id": "state_relay_valve" })