from types import SimpleNamespace
import time
import serial
from . import base
[docs]
class TrapDiagnostics(base.ArduinoBase):
"""Multi-purpuse Arduino controller that handles
- 16x DS18B20 digital thermometers, one for each coil
- 1x thermocouple, measures temp of the heated cathode insulation
- 1x Hall probe
- 1x needle valve
Communication:
- serial interface to Arduino
"""
def __init__(self, bus, bus_switch, **kwargs):
"""Constructor
:param str bus: Name of serial bus where the main Arduino is connected,
e.g. '/dev/ttyUSB0'
:param str bus_switch: Name of serial bus where the Arduino controlling
the PS switch is connected, e.g. '/dev/ttyUSB0'
"""
super().__init__(
serial_cfg=base.SerialConfig(bus, 9600, "-START-"),
**kwargs,
)
self._bus_switch = bus_switch
self._last = SimpleNamespace(
needle_percent=0,
main_switch_on=False,
)
# ==== Inherited abstract methods ====
def _connect(self):
if not super()._connect():
return False
self._readline() # "Locating ...\r\n"
recv = self._readline()
if recv != "START PWM OK":
self.log.debug("Arduino failed START PWM")
self._disconnect()
return False
# Connect to the 2nd Arduino (PS Switch)
def reset(ardu):
ardu.dtr = False
time.sleep(0.1)
ardu.reset_input_buffer()
ardu.dtr = True
try:
self._ardu_switch = serial.Serial(self._bus_switch, 9600)
except serial.SerialException:
self.log.debug("Failed to open serial '%s' (SWITCH)", self._bus_switch)
super()._disconnect()
return False
reset(self._ardu_switch)
self._ardu_switch.timeout = 5
# TODO Not even sure if it sends any hello message
self._ardu_switch.readline().strip().decode("ascii")
return True
def _disconnect(self):
super()._disconnect()
self.main_switch_off()
self._ardu_switch.close()
self._ardu_switch = None
def _defaultstate(self):
super()._defaultstate()
self._last.needle_percent = 0
self._last.main_switch_on = False
# def _safestate(self): # TODO implement?
# pass
# ==== Private commands ====
# ==== Commands ====
[docs]
def is_main_on(self):
return self._last.main_switch_on
[docs]
@base.base_command
def main_switch_on(self):
if not self.is_main_on():
self._ardu_switch.write(b"s")
self._last.main_switch_on = True
[docs]
@base.base_command
def main_switch_off(self):
if self.is_main_on():
self._ardu_switch.write(b"s")
self._last.main_switch_on = False
[docs]
@base.base_command
def temp_coils(self):
"""Read temperature of all 16 coils."""
self._write("r")
# Expected format: "23.5 23.2 25.0 ... 24.0\r\n" (16x float)
return [float(t) for t in self._readline().split(" ")]
[docs]
@base.base_command
def temp_cathode(self):
"""Read temperature of heated cathode insulation."""
self._write("t")
# Expected format: "235.5\r\n"
return float(self._readline())
[docs]
@base.base_command
def measure_field(self):
"""Measure magnetic field using the Hall probe."""
self._write("h")
# Expected format: "2.55 V 15 mT\r\n" or "15 mT\r\n"
return float(self._readline().split(" ")[-2])
[docs]
@base.base_command
def set_needle(self, percent):
"""Set state of the needle valve 0-100%.
:param float percent: Percentage in range 0-100, where 0 is closed.
"""
percent = float(percent)
if not (0.0 <= percent <= 100.0):
raise base.CommandError("Value percent must be between 0-100")
self._write(f"p{round(percent * 255 / 100):d}")
self._last.needle_percent = percent
[docs]
@base.base_command
def set_needle_for(self, percent, duration):
"""Set state of the needle valve 0-100%, close it when duration is
exceeded.
:param float percent: Percentage in range 0-100, where 0 is closed.
:param float duration: Time interval in seconds.
"""
self.set_needle(percent)
time.sleep(max(0, float(duration)))
self.set_needle(0)
[docs]
@base.base_command
def close_needle(self):
"""Close the needle valve, shorthand for set_needle(0)."""
self.set_needle(0)
[docs]
def get_needle(self):
"""Get current state of the needle valve 0-100%."""
return self._last.needle_percent
# ==== Methods/commands related to DeviceClient ====
[docs]
@base.idle_command
def update_frontend(self, device_client):
def make_value(value, formatted, label, minmax, id):
return {"value": value, "formatted": formatted.format(value),
"label": label, "min": minmax[0], "max": minmax[1], "id": id}
device_client.emit("value", make_value(
self.temp_cathode(), "{:.1f} °C", "Cathode", [0, 100], "temp_cathode"
))
temp_coils = self.temp_coils()
item = max(enumerate(temp_coils), key=lambda item: item[1])
device_client.emit("value", make_value(
item[1], "{:.1f} °C", f"Coil {item[0]}", [0, 300], "temp_coils_max"
))
item = min(enumerate(temp_coils), key=lambda item: item[1])
device_client.emit("value", make_value(
item[1], "{:.1f} °C", f"Coil {item[0]}", [0, 300], "temp_coils_min"
))
device_client.emit("value", make_value(
self.measure_field(), "{:.1f} mT", "Hall probe", [0, 100], "hall"
))
device_client.emit("value", make_value(
self.get_needle(), "{:.1f} %", "Needle", [0, 100], "needle"
))