import serial
import time
import pathlib
import numpy as np
from . import base
[docs]
class MagnetsProbes(base.HardwareBase):
"""Set of 8 Arduino based measuring instruments (magnets experiment).
:param buses list: specify the 8 names (str) of serial buses where
Arduinos are connected, e.g. ``/dev/ttyUSB0``
Communication:
- serial interface to 8+1 Arduino devices
- this class mainly takes care of measurement sync between the 8+1 probes
"""
ALL_IDS = ["1", "2", "3", "4", "5", "6", "7", "8", "9"]
def __init__(self, buses, **kwargs):
"""Constructor"""
super().__init__(**kwargs)
self._probe_ids = []
self._buses = tuple(buses)
if not self._buses:
raise ValueError("Param 'buses' cannot be an empty list")
self._ardu = {}
self._last = {
"active_measurement": False,
"measuretime_now": 0,
"measuretime_total": 0,
"samples_fetched": 0,
"samples_total": 0,
"active_plot": False,
"plot_t0": 0,
"plot_probe": 0,
"plot_data": [[], [], []],
}
self._dir_saved_data = pathlib.Path.home() / "saved_data"
# ==== Inherited abstract methods ====
def _connect(self):
# NOTE More or less copy-paste from ArduinoMotorsBase, consider some
# form universal connect() method that can be used everywhere.
# (It should solve problem of connecting 1 or many arduinos,
# reseting them and waiting for universal start-up sequence)
def cleanup_and_log(msg, *args):
self.log.debug(msg, *args)
for ardu in temp.values():
ardu.close()
self._ardu.clear()
def reset(ardu):
ardu.dtr = False
time.sleep(0.1)
ardu.reset_input_buffer()
ardu.dtr = True
temp = {}
# Cycle 1: Connect, reset and let them initialize in parallel
for bus in self._buses:
try:
ardu = serial.Serial(bus, 115200)
except serial.SerialException:
cleanup_and_log("Failed to open serial %s", bus)
return False
# Reset arduino and flush input buffer
reset(ardu)
temp[bus] = ardu
# Cycle 2: Go through the start sequence for each arduino, get id codes
for bus, ardu in temp.items():
# Wait for hello meassage == "Start\r\n"
ardu.timeout = 5
recv = ardu.readline().strip().decode("ascii")
if (recv != "Start"):
cleanup_and_log("Unexpected start message '%s' from"
" arduino at %s", recv, bus)
return False
# Set default timeout for normal communication
ardu.timeout = 0.5
# Read ID message e.g. "1\r\n"
probe_id = ardu.readline().strip().decode("ascii")
msg = "Detected probe id '%s' at %s"
if probe_id not in self.ALL_IDS:
cleanup_and_log(msg + " - not valid id!", probe_id, bus)
return False
elif probe_id in self._ardu:
cleanup_and_log(msg + " - duplicit id!", probe_id, bus)
return False
else:
self.log.debug(msg + " - ok.", probe_id, bus)
self._ardu[probe_id] = ardu
# 2x info line (skip), then "setup ready\r\n"
ardu.readline()
self.log.debug(ardu.readline().strip().decode("ascii")) # size of SD card partition
recv = ardu.readline().strip().decode("ascii")
if (recv != "setup ready"):
cleanup_and_log("Failed init sequence by arduino at %s", bus)
return False
# Keep a list of probe ids that is ordered according to self.ALL_IDS
self._probe_ids = [i for i in self.ALL_IDS if i in self._ardu]
return True
def _disconnect(self):
for key in self._ardu:
self._ardu[key].flush()
self._ardu[key].close()
self._ardu.clear()
self._probe_ids = []
def _is_ready(self):
return True
def _safestate(self):
# Stop and cleanup after measure() (if it was running)
# - _remove_last_data() removes only the current file or nothing,
# no risk of removing any other files
for probe_id in self._ardu:
self._writeline(probe_id, "S")
self._last["active_measurement"] = False
self._last["samples_fetched"] = 0
self._remove_last_data()
# If some command fails and stops in middle of communication,
# make sure to clear serial in order to recover.
for probe_id in self._ardu:
self._clear_serial_queue(probe_id)
self.stop_plot()
# ==== Private commands ====
@base.base_command
def _writeline(self, probe_id, line):
# self.log.debug("%s@WRITE: '%s'", probe_id, line)
self._ardu[probe_id].write((line + "\n").encode("ascii"))
@base.base_command
def _readline(self, probe_id):
line = self._ardu[probe_id].readline().strip().decode("ascii")
# self.log.debug("%s@READ: '%s'", probe_id, line)
return line
@base.base_command
def _clear_serial_queue(self, probe_id):
"""Read lines from serial input until no input is availible."""
while self._readline(probe_id) != "":
pass
@base.base_command
def _measure_data(self, dt, samples):
"""Trigger measurement (to SD card) with given configuration for each
probe and wait for all to finish. Estimates sync delays between probes.
Returns timestamp of first probe and numpy array of relative delays.
:param dt int: time between samples in milliseconds (integer)
:param samples int: number of samples
"""
# Set dt between samples, skip 1 response
for probe_id in self.probe_ids:
self._writeline(probe_id, "V{:d}".format(dt))
self._readline(probe_id) # response: "set dt to 123 ms"
# Discharge (reset) integrators, skip 1 response
for probe_id in self.probe_ids:
self._writeline(probe_id, "R")
self._readline(probe_id) # response: "discharged"
# Run measurements, make note of start-time differences
t0_delays = np.zeros(len(self.probe_ids))
for i, probe_id in enumerate(self.probe_ids):
self._writeline(probe_id, "W{:d}".format(samples))
t0_delays[i] = time.perf_counter()
t0 = time.time()
# Calculate relative delays and start-time of first probe
t0_delays -= t0_delays[0]
t0 = t0 - t0_delays[-1]
self.log.debug("t0 %.6f s, last delay %.3f ms", t0, t0_delays[-1] * 1e3)
# Skip responses
for i, probe_id in enumerate(self.probe_ids):
self._readline(probe_id)
self._readline(probe_id)
# Wait for all measurements to finish
self._last["measuretime_total"] = (dt * 1e-3) * (samples + 1)
for probe_id in self.probe_ids:
recv = ""
while recv != "end of measurement":
self._last["measuretime_now"] = time.time() - t0
recv = self._readline(probe_id)
# New measurement finished, update sample size of current data
self._last["samples_total"] = samples
self._last["samples_fetched"] = 0
return t0, t0_delays
@base.compound_command
def _fetch_last_data(self):
"""Retrieve most recent data file and return it as numpy array.
Columns: timestamp [us], integrated [V?], raw [V?]
"""
for pid in self.probe_ids:
self._writeline(pid, "X-1")
self._readline(pid) # skip filename
# Note: Method _measure_data() ensures same number of samples per probe.
# Fetch data from probes in parallel
contents = dict.fromkeys(self.probe_ids, "")
recv = {pid: self._readline(pid) for pid in self.probe_ids}
while recv[self.probe_ids[0]] != "end of file":
for pid in self.probe_ids:
contents[pid] += recv[pid] + ","
recv[pid] = self._readline(pid)
self._last["samples_fetched"] += 1
def to_numpy(text_data):
return np.fromstring(text_data, dtype=float, sep=',').reshape(-1, 3).T
shape = to_numpy(contents[self.probe_ids[0]]).shape
columns = []
for pid in self.ALL_IDS:
if pid in contents:
columns.append(to_numpy(contents[pid]))
else:
columns.append(np.full(shape, np.nan))
return np.concatenate(columns)
@base.base_command
def _remove_last_data(self):
"""Remove file with most recent data from SD card."""
for pid in self.probe_ids:
self._writeline(pid, "D-1")
self._readline(pid) # skip filename
self._readline(pid) # skip "file removed" (or error)
# ==== Commands ====
@property
def probe_ids(self):
return self._probe_ids
[docs]
@base.compound_command # So that the plot stops updating in background
def view_plot(self, probe_id):
if probe_id not in self.probe_ids:
raise base.CommandError("Use one of {}".format(self.probe_ids))
self._last["plot_t0"] = time.time()
self._last["plot_probe"] = self.probe_ids.index(probe_id)
self._last["plot_data"] = [[], [], []]
self._last["active_plot"] = True
[docs]
def stop_plot(self):
self._last["active_plot"] = False
[docs]
@base.compound_command
def measure(self, device_client, duration, dt):
"""Trigger a short synchronized measurement using all probes. Data is
saved to file and emitted to server.
- Estimated error in time-sync between probes = 0.5ms
- Time resolution ~ 0.1ms
- Signal resolution ~ 0.0006
:param float duration: total duration of measurement [s], note that
there is an upper limit 20_000 samples
:param float dt: time between two samples [s], minimal is 1ms
"""
duration = abs(float(duration))
dt = max(1, int(float(dt) * 1000))
samples = int(duration / dt * 1000)
if samples > 20_000:
raise base.CommandError("Max 20,000 samples (requested {:,})".format(samples))
if samples <= 0:
raise base.CommandError("Duration must be greater than dt")
self._last["samples_fetched"] = 0
self._last["active_measurement"] = True
self.log.info("Measuring {:,} samples, dt = {:.3f} s".format(samples, dt * 1e-3))
t0, t0_delays = self._measure_data(dt, samples)
data = self._fetch_last_data()
# Convert micros -> seconds, use measured delays to sync probes
for i, delay in enumerate(t0_delays):
data[3*i] = (data[3*i] * 1e-6) + delay
# Prepare header, filepath and save
header = "timestamp {:.4f}\n".format(t0)
header += ", ".join("time_{0}[s], integ_{0}, raw_{0}".format(probe_id)
for probe_id in self.probe_ids)
filepath = "magnets_probes_{}_{{}}.txt".format(time.strftime("%Y-%m-%d_%H-%M"))
suffix = 1
while (self._dir_saved_data / filepath.format(suffix)).exists():
suffix += 1
filepath = self._dir_saved_data / filepath.format(suffix)
np.savetxt(str(filepath), data.T, fmt="%.4f", header=header)
# Clean-up SD cards (at this point the data is safely saved in local file)
self._remove_last_data()
# Emit to server
if device_client is not None:
with filepath.open("r") as file:
device_client.emit_datafile(file, filepath.name)
# Plot data preview
def get_graph_data(idx, data):
thin = 1 + data.shape[1]//500 # downsample to 250-500 values
return data[3*idx : 3*idx+3, ::thin].tolist()
self._last["plot_data"] = get_graph_data(self._last["plot_probe"], data)
self._emit_graphs(device_client, note="[data preview]")
self.stop_plot()
[docs]
@base.base_command
def measure_single(self):
"""Measure one sample on each probe. Returns numpy array containing
integrated and raw value for each probe, shape=(8, 2).
Order of the probes corresponds to the property ``probe_ids``.
.. note::
This method is useful mainly for realtime plotting and does not
guarantee best precision (time-sync, sampling rate). See measure().
"""
data = np.full((len(self.probe_ids), 2), fill_value=np.nan, dtype=float)
for i, probe_id in enumerate(self.probe_ids):
self._writeline(probe_id, "Z")
data[i, :] = np.fromstring(self._readline(probe_id), dtype=float, sep=' ')
return np.array(data)
# ==== Methods/commands related to DeviceClient ====
@base.idle_command(busy_return=False)
def _idle_update_plotdata(self, probe_index):
if not self._last["active_plot"]:
return False
integ, raw = self.measure_single()[probe_index]
self._last["plot_data"][0].append(time.time() - self._last["plot_t0"])
self._last["plot_data"][1].append(integ)
self._last["plot_data"][2].append(raw)
if len(self._last["plot_data"][0]) > 100:
self._last["plot_data"][0].pop(0)
self._last["plot_data"][1].pop(0)
self._last["plot_data"][2].pop(0)
return True
def _emit_graphs(self, device_client, note=""):
probe_id = self.probe_ids[self._last["plot_probe"]]
note = " " + note if note != "" else ""
device_client.emit("graph", {
"title": "Probe {} integrated".format(probe_id) + note,
"x": self._last["plot_data"][0],
"y": self._last["plot_data"][1],
"xlabel": "t [s]",
"ylabel": "[V?]",
"id": "graph_integrated"
})
device_client.emit("graph", {
"title": "Probe {} raw".format(probe_id) + note,
"x": self._last["plot_data"][0],
"y": self._last["plot_data"][2],
"xlabel": "t [s]",
"ylabel": "[V?]",
"id": "graph_raw"
})
[docs]
def update_frontend(self, device_client):
# First time only - init the element on frontend
if self._last["samples_total"] == 0:
self._last["samples_total"] = 1
device_client.emit("value", {"label": "Status", "id": "progress",
"formatted": "00:00", "value": 0, "min": 0, "max": 1})
if self._last["active_measurement"]:
if self._last["samples_fetched"] == 0:
value = self._last["measuretime_now"]
device_client.emit("value", {
"value": 0.5 * value,
"formatted": time.strftime("%M:%S", time.gmtime(value)),
"label": "Status",
"min": 0, "max": self._last["measuretime_total"],
"id": "progress",
})
else:
value = (100 * self._last["samples_fetched"]) // self._last["samples_total"]
device_client.emit("value", {
"value": 50 + 0.5 * value,
"formatted": "data {:.0f}%".format(value),
"label": "Status",
"min": 0, "max": 100,
"id": "progress",
})
if value == 100:
self._last["active_measurement"] = False
if self._idle_update_plotdata(self._last["plot_probe"]):
self._emit_graphs(device_client)