Source code for device.trap_diganostics

from types import SimpleNamespace
import time
import serial

from . import base


[docs] class TrapDiagnostics(base.ArduinoBase): """Multi-purpuse Arduino controller that handles - 16x DS18B20 digital thermometers, one for each coil - 1x thermocouple, measures temp of the heated cathode insulation - 1x Hall probe - 1x needle valve Communication: - serial interface to Arduino """ def __init__(self, bus, bus_switch, **kwargs): """Constructor :param str bus: Name of serial bus where the main Arduino is connected, e.g. '/dev/ttyUSB0' :param str bus_switch: Name of serial bus where the Arduino controlling the PS switch is connected, e.g. '/dev/ttyUSB0' """ super().__init__( serial_cfg=base.SerialConfig(bus, 9600, "-START-"), **kwargs, ) self._bus_switch = bus_switch self._last = SimpleNamespace( needle_percent=0, main_switch_on=False, ) # ==== Inherited abstract methods ==== def _connect(self): if not super()._connect(): return False self._readline() # "Locating ...\r\n" recv = self._readline() if recv != "START PWM OK": self.log.debug("Arduino failed START PWM") self._disconnect() return False # Connect to the 2nd Arduino (PS Switch) def reset(ardu): ardu.dtr = False time.sleep(0.1) ardu.reset_input_buffer() ardu.dtr = True try: self._ardu_switch = serial.Serial(self._bus_switch, 9600) except serial.SerialException: self.log.debug("Failed to open serial '%s' (SWITCH)", self._bus_switch) super()._disconnect() return False reset(self._ardu_switch) self._ardu_switch.timeout = 5 # TODO Not even sure if it sends any hello message self._ardu_switch.readline().strip().decode("ascii") return True def _disconnect(self): super()._disconnect() self.main_switch_off() self._ardu_switch.close() self._ardu_switch = None def _defaultstate(self): super()._defaultstate() self._last.needle_percent = 0 self._last.main_switch_on = False # def _safestate(self): # TODO implement? # pass # ==== Private commands ==== # ==== Commands ====
[docs] def is_main_on(self): return self._last.main_switch_on
[docs] @base.base_command def main_switch_on(self): if not self.is_main_on(): self._ardu_switch.write(b"s") self._last.main_switch_on = True
[docs] @base.base_command def main_switch_off(self): if self.is_main_on(): self._ardu_switch.write(b"s") self._last.main_switch_on = False
[docs] @base.base_command def temp_coils(self): """Read temperature of all 16 coils.""" self._write("r") # Expected format: "23.5 23.2 25.0 ... 24.0\r\n" (16x float) return [float(t) for t in self._readline().split(" ")]
[docs] @base.base_command def temp_cathode(self): """Read temperature of heated cathode insulation.""" self._write("t") # Expected format: "235.5\r\n" return float(self._readline())
[docs] @base.base_command def measure_field(self): """Measure magnetic field using the Hall probe.""" self._write("h") # Expected format: "2.55 V 15 mT\r\n" or "15 mT\r\n" return float(self._readline().split(" ")[-2])
[docs] @base.base_command def set_needle(self, percent): """Set state of the needle valve 0-100%. :param float percent: Percentage in range 0-100, where 0 is closed. """ percent = float(percent) if not (0.0 <= percent <= 100.0): raise base.CommandError("Value percent must be between 0-100") self._write(f"p{round(percent * 255 / 100):d}") self._last.needle_percent = percent
[docs] @base.base_command def set_needle_for(self, percent, duration): """Set state of the needle valve 0-100%, close it when duration is exceeded. :param float percent: Percentage in range 0-100, where 0 is closed. :param float duration: Time interval in seconds. """ self.set_needle(percent) time.sleep(max(0, float(duration))) self.set_needle(0)
[docs] @base.base_command def close_needle(self): """Close the needle valve, shorthand for set_needle(0).""" self.set_needle(0)
[docs] def get_needle(self): """Get current state of the needle valve 0-100%.""" return self._last.needle_percent
# ==== Methods/commands related to DeviceClient ====
[docs] @base.idle_command def update_frontend(self, device_client): def make_value(value, formatted, label, minmax, id): return {"value": value, "formatted": formatted.format(value), "label": label, "min": minmax[0], "max": minmax[1], "id": id} device_client.emit("value", make_value( self.temp_cathode(), "{:.1f} °C", "Cathode", [0, 100], "temp_cathode" )) temp_coils = self.temp_coils() item = max(enumerate(temp_coils), key=lambda item: item[1]) device_client.emit("value", make_value( item[1], "{:.1f} °C", f"Coil {item[0]}", [0, 300], "temp_coils_max" )) item = min(enumerate(temp_coils), key=lambda item: item[1]) device_client.emit("value", make_value( item[1], "{:.1f} °C", f"Coil {item[0]}", [0, 300], "temp_coils_min" )) device_client.emit("value", make_value( self.measure_field(), "{:.1f} mT", "Hall probe", [0, 100], "hall" )) device_client.emit("value", make_value( self.get_needle(), "{:.1f} %", "Needle", [0, 100], "needle" ))