import time
import random
import math
import tempfile
from . import base
[docs]
class MockupDevice(base.HardwareBase):
"""A mockup class to demonstrate HardwareBase functionalities and the
integration with DeviceClient.
HardwareBase features:
- Manual `connect`, `disconnect` or automatic via calling `is_ready`
- Manual command `interrupt`
- Automatic safety kill-switch called in the case of interrupt or error
- Thread-safety = only one command at a time (via `@base_command`, ...)
=> this is required for calls made by DeviceClient
- Info, warning, error logging (to console and file)
DeviceClient features:
- Autoconnect and check connection via calls to `is_ready`
- Run user commands with parameters
- Run custom periodic commands in background
- Display info messages
- Display a graph
"""
def __init__(self, constants=None, **kwargs):
super().__init__(**kwargs)
self._const = constants if constants else dict(
sample_count=30,
sample_dt=0.2,
peak_power=500,
t_rise=5,
t_tail=3,
)
self._some_switch = False
# Simulated loss of connection
self._connection = dict(
unstable=False,
lost=False,
)
# Simulated device error queue
self._device_errors = []
# Simulated "discharge"
self._discharge = None
self.data_t = []
self.data_p = []
self.total_energy = 0.0
# ==== Implementing abstract methods from parent class ====
def _connect(self):
self._some_switch = False
if self._connection["unstable"]:
if random.random() < 0.3:
self._connection["lost"] = False
return not self._connection["lost"]
else:
self._connection["lost"] = False
return True
def _disconnect(self):
self._connection["lost"] = True
def _is_ready(self):
if self._connection["unstable"]:
if random.random() < 0.04:
time.sleep(1)
self._connection["lost"] = True
return not self._connection["lost"]
def _safestate(self):
commands = [
"Initiating safety protocol AX14.E922",
"Containment force-field is online",
"Heating power at zero",
"Electron temperature decreasing",
"Magnetic field disengaged",
]
for c in commands:
self.raw_command(c)
time.sleep(0.2 + 2*random.random())
self._discharge = None
def _readerror(self):
if self._device_errors:
return self._device_errors.pop()
else:
return None
# ==== New methods ====
[docs]
def is_some_switched_on(self):
return self._some_switch
[docs]
@base.base_command
def set_some_switch(self, enabled):
"""See: toggle_connection_stability()"""
if enabled != self._some_switch:
self._some_switch = enabled
self.log.info("Something switched " + ("on" if enabled else "off"))
[docs]
@base.base_command
def toggle_connection_stability(self):
"""Simulated: Trigger someone-is-messing-with-the-cable problems."""
self._connection["unstable"] = not self._connection["unstable"]
if self._connection["unstable"]:
self.log.warning("Connection UNSTABLE!")
else:
self.log.info("Connection stable!")
[docs]
@base.base_command
def raw_command(self, command):
"""Simulated: Send a command to the device."""
if not self._is_ready(): # No need to do this in case of non-mockup hardware
raise base.TransmissionError("Command not sent")
time.sleep(0.1)
self.log.info("(Command) " + command)
[docs]
@base.base_command
def read_power_output(self):
"""Simulated: Measure power output, returns tuple (time, power)."""
if not self._is_ready(): # No need to do this in case of non-mockup hardware
raise base.TransmissionError("Command not sent")
if self._discharge:
return self._discharge.measure_power()
else:
self._device_errors.append("Cannot read power output, no dischrage in progress")
return (0, 0)
[docs]
@base.idle_command
def update_frontend(self, device_client):
"""Update displayed power output data."""
if self._discharge:
duration = self._discharge.t_flat
t = min(max(self.data_t[-1]-self._discharge.t_rise, 0), duration)
device_client.emit("value", {
"value": t,
"formatted": "{:.2f} s".format(t),
"label": "Time",
"min": 0,
"max": duration,
"id": "elapsed"
})
n = self._const["sample_count"]
device_client.emit("graph", {
"title": "Power output",
"x": self.data_t[-n:],
"y": self.data_p[-n:],
"xlabel": "t [s]",
"ylabel": "P [MW]",
"id": "graph_power"
})
[docs]
@base.compound_command
def initiate_discharge(self, device_client, duration):
"""Simulated: Discharge with a specified time duration. It just reads
power output from device and emits some meassages ('commands').
"""
n, dt = self._const["sample_count"], self._const["sample_dt"]
duration = float(duration)
if duration <= 0:
self.log.error("Negative duration, discharge cancelled")
return
def start():
self.data_t = [dt*n/(n-1)*i for i in range(n)]
self.data_p = n * [0]
self._discharge = Discharge(duration, self._const["peak_power"])
self._discharge.start()
def update_data():
t, p = self.read_power_output()
self.data_t.append(t)
self.data_p.append(p)
# Start the discharge
start()
while self._discharge.phase != "none":
update_data()
if device_client:
self.update_frontend(device_client)
c = self._discharge.commentary()
if c:
self.raw_command(c)
time.sleep(dt)
# Get total energy (integrate power)
self.total_energy = 0.0
for t1, t2, p in zip(self.data_t[:-1], self.data_t[1:], self.data_p[1:]):
self.total_energy += (t2-t1) * p
self.log.info("Generated {:.2f} GJ".format(self.total_energy/1e3))
with tempfile.NamedTemporaryFile("w+") as file:
file.write("".join(map(lambda tp: f"{tp[0]:.2f}, {tp[1]:.6e}\n",
zip(self.data_t, self.data_p))))
file.seek(0)
device_client.emit_datafile(file, time.strftime("%Y-%m-%d_%H-%M-%S_my_discharge.txt"))
self._discharge = None
[docs]
@base.base_command
def emit_mockfile(self, device_client):
with tempfile.NamedTemporaryFile("w+") as file:
file.write("".join("1.23 4.56\n"))
file.seek(0)
device_client.emit_datafile(file, time.strftime("%Y-%m-%d_%H-%M-%S_my_mockfile.txt"))
[docs]
class Discharge:
def __init__(self, duration, peak_power, t_rise=5, t_tail=3):
self.t_flat = duration
self.peak_power = peak_power
self.t_rise = t_rise
self.t_tail = t_tail
self.comment_sequence = {}
@property
def time(self):
return time.time() - self.t_0
@property
def phase(self):
t = self.time
if 0 <= t < self.t_rise:
return "rise"
elif t < self.t_rise + self.t_flat:
return "flat"
elif t < self.t_rise + self.t_flat + self.t_tail:
return "tail"
else:
return "none"
[docs]
def start(self):
self.t_0 = time.time()
self.comment_sequence = {
"rise": "Discharge initiated, power increasing",
"flat": "Peak fusion power reached",
"tail": "Initiating power-down",
"none": None,
}
[docs]
def measure_power(self):
"""Fancy time evolution of "power output" during a discharge. Returns
a tuple (time, power).
"""
t = self.time
mu = {
"rise": t / self.t_rise,
"flat": 1,
"tail": math.cos(0.5*math.pi * ((t-self.t_rise-self.t_flat)/self.t_tail)),
"none": 0,
}[self.phase] * self.peak_power
return t, random.normalvariate(mu, 0.08*mu)