import serial
import time
import glob
import re
from . import base
[docs]
class ResonanceController(base.HardwareBase):
"""Arduino based multipurpose controller (resonance experiment).
Communication:
- serial interface to Arduino
"""
POWER_LIMS = (0, 80) # RF generator output power limits [W]
def __init__(self, bus=None, **kwargs):
"""Constructor
:param str bus: (optional) specify the name of serial bus where
Arduino is connected, e.g. ``/dev/ttyUSB0``
"""
super().__init__(**kwargs)
# Automatically search for USB serial device if none were specified
if bus is None:
buses = glob.glob("/dev/ttyUSB*")
if len(buses) != 1:
msg = "Expected 1 usb serial port, got {} instead"
raise RuntimeError(msg.format(len(buses)))
self._bus = buses[0]
else:
self._bus = bus
self._ardu = None
self._last = {}
self._progress = 0
# ==== Inherited abstract methods ====
def _connect(self):
try:
self._ardu = serial.Serial(self._bus, 9600)
except serial.SerialException:
self.log("Failed to open serial '%s'", self._bus)
return False
# Reset
self._ardu.setDTR(False)
time.sleep(0.1)
self._ardu.reset_input_buffer()
self._ardu.setDTR(True)
self._ardu.readline() # Initial message "START\r\n"
self._read_current_states()
return True
def _disconnect(self):
# TODO Close everything
self.output_off()
# self.close_valve()
# self.set_device_relays(False, False)
self._ardu.close()
self._ardu = None
def _is_ready(self):
return True # TODO Maybe check something? Test with cable disconnect
def _safestate(self):
self.output_off()
# self.close_valve() # TODO Should the valve be closing?
# ==== Private commands ====
@base.base_command
def _readresp(self):
return self._ardu.readline().strip().decode("ascii")
@base.base_command
def _writecmd(self, cmd):
self._ardu.write(cmd.encode("ascii"))
@base.base_command
def _writecmd_toggle(self, cmd, key_in_last):
self._writecmd(cmd)
self._last[key_in_last] = not self._last[key_in_last]
@base.base_command
def _read_current_states(self):
self._writecmd("i")
# Ignore any left-over lines, then parse power (actual and reflected)
#
# "0.00 V na P_FRW. 0.00 W do zateze."
# "0.00 V na P_REV. 0.00 W odrazeny."
line = ""
while "P_FRW" not in line:
line = self._readresp()
parsed = re.search(r"P_FRW\. ([0-9]+\.[0-9]+) W", line)
self._last["rf_power_actual"] = float(parsed.group(1))
parsed = re.search(r"P_REV\. ([0-9]+\.[0-9]+) W", self._readresp())
self._last["rf_power_reflected"] = float(parsed.group(1))
# Parse various states
#
# "Limit state 0" (not used)
# "Output state 1"
# "Output set 0" (not used)
# "Relay state 1" - relay for RF generator
# "Ventil state 0"
self._readresp()
self._last["rf_output"] = self._readresp()[-1] == "1"
self._readresp()
self._last["rf_on"] = self._readresp()[-1] == "1"
self._last["valve_open"] = self._readresp()[-1] == "1"
self._last["sa_on"] = False # self._readresp()[-1] == "1" # TODO Is this implemented? (second relay)
def _validate_power(self, power):
power = float(power)
if self.POWER_LIMS[0] <= power <= self.POWER_LIMS[1]:
return power
else:
msg = "Power value out of bounds ({:.2f}-{:.2f} W)"
raise base.CommandError(msg.format(*self.POWER_LIMS))
# ==== Commands ====
[docs]
@base.base_command
def set_device_relays(self, rf_gen, spect_an):
"""Enable or disable power to devices.
:param bool rf_gen: RF generator on/off
:param bool spect_an: spectral analyser on/off
"""
def parse(s):
textvals = {
"on": True, "true": True, "1": True,
"off": False, "false": False, "0": False,
}
if isinstance(s, bool):
return s
elif isinstance(s, str) and s in textvals:
return textvals[s]
else:
raise base.CommandError("Please use on/off, 1/0, true/false")
if parse(rf_gen):
self.switch_generator_on()
else:
self.switch_generator_off()
if parse(spect_an):
self.switch_spectral_on()
else:
self.switch_spectral_on()
[docs]
@base.base_command
def is_generator_on(self):
"""Return the state of relay that powers output generator."""
self._read_current_states()
return self._last["rf_on"]
[docs]
@base.base_command
def switch_generator_on(self):
if not self.is_generator_on():
self._writecmd("r")
[docs]
@base.base_command
def switch_generator_off(self):
if self.is_generator_on():
self._writecmd("r")
[docs]
@base.base_command
def is_spectral_on(self):
"""Return the state of relay that powers spectral analyser."""
self._read_current_states()
return self._last["sa_on"] # TODO Is this already implemented in arduino?
[docs]
@base.base_command
def switch_spectral_on(self):
if not self.is_spectral_on():
self._writecmd("a") # TODO Is this already implemented in arduino?
[docs]
@base.base_command
def switch_spectral_off(self):
if self.is_spectral_on():
self._writecmd("a") # TODO Is this already implemented in arduino?
[docs]
@base.base_command
def is_valve_open(self):
"""Return the state of the valve."""
self._read_current_states()
return self._last["valve_open"]
[docs]
@base.base_command
def open_valve(self):
"""Open valve."""
if not self.is_valve_open():
self._writecmd_toggle("v", "valve_open")
[docs]
@base.base_command
def close_valve(self):
"""Close valve."""
if self.is_valve_open():
self._writecmd_toggle("v", "valve_open")
[docs]
@base.base_command
def is_output_on(self):
"""Return the state of RF generator output."""
self._read_current_states()
return self._last["rf_output"]
[docs]
@base.base_command
def output_on(self, power):
"""Enable RF generator output.
:param float power: output power [W]
"""
power = self._validate_power(power)
self._writecmd("w{:.0f}".format(power)) # TODO Should the generator be disabled first before setting power?
self._readresp() # Response "Set watts 70.00 equals Volts 3.50"
# self._last["rf_power_set"] = power
if not self.is_output_on():
self._writecmd_toggle("o", "rf_output")
[docs]
@base.compound_command
def output_on_for(self, power, duration):
"""Enable RF generator output for a limited amount of time.
:param float power: output power [W]
:param float duration: time interval [s]
"""
duration = float(duration)
power = self._validate_power(power)
self.log.info("Power on {:.1f}W for {:.1f}s".format(power, duration))
self.output_on(power)
t0 = time.time()
while(time.time() - t0 < duration):
self._read_current_states()
self._progress = (time.time() - t0) / duration
time.sleep(0.1)
self._progress = 1
self.output_off()
[docs]
@base.base_command
def output_off(self):
"""Disable RF generator output."""
if self.is_output_on():
self._writecmd_toggle("o", "rf_output")
self._writecmd("w0") # Power -> 0 W
# self._last["rf_power_set"] = 0.0
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idle_update_states(self):
self._read_current_states()
[docs]
def update_frontend(self, device_client):
self._idle_update_states()
value = self._last["rf_power_actual"]
device_client.emit("value", {
"value": value,
"formatted": "{:.2f} W".format(value),
"label": "Output P",
"min": self.POWER_LIMS[0],
"max": self.POWER_LIMS[1],
"id": "rf_power_actual"
})
value = self._last["rf_power_reflected"]
device_client.emit("value", {
"value": value,
"formatted": "{:.2f} W".format(value),
"label": "Reflected P",
"min": self.POWER_LIMS[0],
"max": self.POWER_LIMS[1],
"id": "rf_power_reflected"
})
device_client.emit("value", {
"value": self._progress,
"formatted": "",
"label": "Task progress",
"min": 0,
"max": 1,
"id": "state_progress"
})
state = self._last["valve_open"]
device_client.emit("value", {
"value": 1 if state else 0,
"formatted": "open" if state else "closed",
"label": "Valve",
"min": 0,
"max": 1,
"id": "state_relay_valve"
})