Source code for device.mockup

import time
import random
import math
import tempfile

from . import base


[docs] class MockupDevice(base.HardwareBase): """A mockup class to demonstrate HardwareBase functionalities and the integration with DeviceClient. HardwareBase features: - Manual `connect`, `disconnect` or automatic via calling `is_ready` - Manual command `interrupt` - Automatic safety kill-switch called in the case of interrupt or error - Thread-safety = only one command at a time (via `@base_command`, ...) => this is required for calls made by DeviceClient - Info, warning, error logging (to console and file) DeviceClient features: - Autoconnect and check connection via calls to `is_ready` - Run user commands with parameters - Run custom periodic commands in background - Display info messages - Display a graph """ def __init__(self, constants=None, **kwargs): super().__init__(**kwargs) self._const = constants if constants else dict( sample_count=30, sample_dt=0.2, peak_power=500, t_rise=5, t_tail=3, ) self._some_switch = False # Simulated loss of connection self._connection = dict( unstable=False, lost=False, ) # Simulated device error queue self._device_errors = [] # Simulated "discharge" self._discharge = None self.data_t = [] self.data_p = [] self.total_energy = 0.0 # ==== Implementing abstract methods from parent class ==== def _connect(self): self._some_switch = False if self._connection["unstable"]: if random.random() < 0.3: self._connection["lost"] = False return not self._connection["lost"] else: self._connection["lost"] = False return True def _disconnect(self): self._connection["lost"] = True def _is_ready(self): if self._connection["unstable"]: if random.random() < 0.04: time.sleep(1) self._connection["lost"] = True return not self._connection["lost"] def _safestate(self): commands = [ "Initiating safety protocol AX14.E922", "Containment force-field is online", "Heating power at zero", "Electron temperature decreasing", "Magnetic field disengaged", ] for c in commands: self.raw_command(c) time.sleep(0.2 + 2*random.random()) self._discharge = None def _readerror(self): if self._device_errors: return self._device_errors.pop() else: return None # ==== New methods ====
[docs] def is_some_switched_on(self): return self._some_switch
[docs] @base.base_command def set_some_switch(self, enabled): """See: toggle_connection_stability()""" if enabled != self._some_switch: self._some_switch = enabled self.log.info("Something switched " + ("on" if enabled else "off"))
[docs] @base.base_command def toggle_connection_stability(self): """Simulated: Trigger someone-is-messing-with-the-cable problems.""" self._connection["unstable"] = not self._connection["unstable"] if self._connection["unstable"]: self.log.warning("Connection UNSTABLE!") else: self.log.info("Connection stable!")
[docs] @base.base_command def raw_command(self, command): """Simulated: Send a command to the device.""" if not self._is_ready(): # No need to do this in case of non-mockup hardware raise base.TransmissionError("Command not sent") time.sleep(0.1) self.log.info("(Command) " + command)
[docs] @base.base_command def read_power_output(self): """Simulated: Measure power output, returns tuple (time, power).""" if not self._is_ready(): # No need to do this in case of non-mockup hardware raise base.TransmissionError("Command not sent") if self._discharge: return self._discharge.measure_power() else: self._device_errors.append("Cannot read power output, no dischrage in progress") return (0, 0)
[docs] @base.idle_command def update_frontend(self, device_client): """Update displayed power output data.""" if self._discharge: duration = self._discharge.t_flat t = min(max(self.data_t[-1]-self._discharge.t_rise, 0), duration) device_client.emit("value", { "value": t, "formatted": "{:.2f} s".format(t), "label": "Time", "min": 0, "max": duration, "id": "elapsed" }) n = self._const["sample_count"] device_client.emit("graph", { "title": "Power output", "x": self.data_t[-n:], "y": self.data_p[-n:], "xlabel": "t [s]", "ylabel": "P [MW]", "id": "graph_power" })
[docs] @base.compound_command def initiate_discharge(self, device_client, duration): """Simulated: Discharge with a specified time duration. It just reads power output from device and emits some meassages ('commands'). """ n, dt = self._const["sample_count"], self._const["sample_dt"] duration = float(duration) if duration <= 0: self.log.error("Negative duration, discharge cancelled") return def start(): self.data_t = [dt*n/(n-1)*i for i in range(n)] self.data_p = n * [0] self._discharge = Discharge(duration, self._const["peak_power"]) self._discharge.start() def update_data(): t, p = self.read_power_output() self.data_t.append(t) self.data_p.append(p) # Start the discharge start() while self._discharge.phase != "none": update_data() if device_client: self.update_frontend(device_client) c = self._discharge.commentary() if c: self.raw_command(c) time.sleep(dt) # Get total energy (integrate power) self.total_energy = 0.0 for t1, t2, p in zip(self.data_t[:-1], self.data_t[1:], self.data_p[1:]): self.total_energy += (t2-t1) * p self.log.info("Generated {:.2f} GJ".format(self.total_energy/1e3)) with tempfile.NamedTemporaryFile("w+") as file: file.write("".join(map(lambda tp: f"{tp[0]:.2f}, {tp[1]:.6e}\n", zip(self.data_t, self.data_p)))) file.seek(0) device_client.emit_datafile(file, time.strftime("%Y-%m-%d_%H-%M-%S_my_discharge.txt")) self._discharge = None
[docs] @base.base_command def emit_mockfile(self, device_client): with tempfile.NamedTemporaryFile("w+") as file: file.write("".join("1.23 4.56\n")) file.seek(0) device_client.emit_datafile(file, time.strftime("%Y-%m-%d_%H-%M-%S_my_mockfile.txt"))
[docs] class Discharge: def __init__(self, duration, peak_power, t_rise=5, t_tail=3): self.t_flat = duration self.peak_power = peak_power self.t_rise = t_rise self.t_tail = t_tail self.comment_sequence = {} @property def time(self): return time.time() - self.t_0 @property def phase(self): t = self.time if 0 <= t < self.t_rise: return "rise" elif t < self.t_rise + self.t_flat: return "flat" elif t < self.t_rise + self.t_flat + self.t_tail: return "tail" else: return "none"
[docs] def start(self): self.t_0 = time.time() self.comment_sequence = { "rise": "Discharge initiated, power increasing", "flat": "Peak fusion power reached", "tail": "Initiating power-down", "none": None, }
[docs] def measure_power(self): """Fancy time evolution of "power output" during a discharge. Returns a tuple (time, power). """ t = self.time mu = { "rise": t / self.t_rise, "flat": 1, "tail": math.cos(0.5*math.pi * ((t-self.t_rise-self.t_flat)/self.t_tail)), "none": 0, }[self.phase] * self.peak_power return t, random.normalvariate(mu, 0.08*mu)
[docs] def commentary(self): """Fancy messages that are triggered during the discharge. Returns None when no new message is availible """ c = self.comment_sequence[self.phase] if c is not None: self.comment_sequence[self.phase] = None return c