Source code for device.magnets_probes

import serial
import time
import pathlib
import numpy as np

from . import base


[docs] class MagnetsProbes(base.HardwareBase): """Set of 8 Arduino based measuring instruments (magnets experiment). :param buses list: specify the 8 names (str) of serial buses where Arduinos are connected, e.g. ``/dev/ttyUSB0`` Communication: - serial interface to 8+1 Arduino devices - this class mainly takes care of measurement sync between the 8+1 probes """ ALL_IDS = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] def __init__(self, buses, **kwargs): """Constructor""" super().__init__(**kwargs) self._probe_ids = [] self._buses = tuple(buses) if not self._buses: raise ValueError("Param 'buses' cannot be an empty list") self._ardu = {} self._last = { "active_measurement": False, "measuretime_now": 0, "measuretime_total": 0, "samples_fetched": 0, "samples_total": 0, "active_plot": False, "plot_t0": 0, "plot_probe": 0, "plot_data": [[], [], []], } self._dir_saved_data = pathlib.Path.home() / "saved_data" # ==== Inherited abstract methods ==== def _connect(self): # NOTE More or less copy-paste from ArduinoMotorsBase, consider some # form universal connect() method that can be used everywhere. # (It should solve problem of connecting 1 or many arduinos, # reseting them and waiting for universal start-up sequence) def cleanup_and_log(msg, *args): self.log.debug(msg, *args) for ardu in temp.values(): ardu.close() self._ardu.clear() def reset(ardu): ardu.dtr = False time.sleep(0.1) ardu.reset_input_buffer() ardu.dtr = True temp = {} # Cycle 1: Connect, reset and let them initialize in parallel for bus in self._buses: try: ardu = serial.Serial(bus, 115200) except serial.SerialException: cleanup_and_log("Failed to open serial %s", bus) return False # Reset arduino and flush input buffer reset(ardu) temp[bus] = ardu # Cycle 2: Go through the start sequence for each arduino, get id codes for bus, ardu in temp.items(): # Wait for hello meassage == "Start\r\n" ardu.timeout = 5 recv = ardu.readline().strip().decode("ascii") if (recv != "Start"): cleanup_and_log("Unexpected start message '%s' from" " arduino at %s", recv, bus) return False # Set default timeout for normal communication ardu.timeout = 0.5 # Read ID message e.g. "1\r\n" probe_id = ardu.readline().strip().decode("ascii") msg = "Detected probe id '%s' at %s" if probe_id not in self.ALL_IDS: cleanup_and_log(msg + " - not valid id!", probe_id, bus) return False elif probe_id in self._ardu: cleanup_and_log(msg + " - duplicit id!", probe_id, bus) return False else: self.log.debug(msg + " - ok.", probe_id, bus) self._ardu[probe_id] = ardu # 2x info line (skip), then "setup ready\r\n" ardu.readline() self.log.debug(ardu.readline().strip().decode("ascii")) # size of SD card partition recv = ardu.readline().strip().decode("ascii") if (recv != "setup ready"): cleanup_and_log("Failed init sequence by arduino at %s", bus) return False # Keep a list of probe ids that is ordered according to self.ALL_IDS self._probe_ids = [i for i in self.ALL_IDS if i in self._ardu] return True def _disconnect(self): for key in self._ardu: self._ardu[key].flush() self._ardu[key].close() self._ardu.clear() self._probe_ids = [] def _is_ready(self): return True def _safestate(self): # Stop and cleanup after measure() (if it was running) # - _remove_last_data() removes only the current file or nothing, # no risk of removing any other files for probe_id in self._ardu: self._writeline(probe_id, "S") self._last["active_measurement"] = False self._last["samples_fetched"] = 0 self._remove_last_data() # If some command fails and stops in middle of communication, # make sure to clear serial in order to recover. for probe_id in self._ardu: self._clear_serial_queue(probe_id) self.stop_plot() # ==== Private commands ==== @base.base_command def _writeline(self, probe_id, line): # self.log.debug("%s@WRITE: '%s'", probe_id, line) self._ardu[probe_id].write((line + "\n").encode("ascii")) @base.base_command def _readline(self, probe_id): line = self._ardu[probe_id].readline().strip().decode("ascii") # self.log.debug("%s@READ: '%s'", probe_id, line) return line @base.base_command def _clear_serial_queue(self, probe_id): """Read lines from serial input until no input is availible.""" while self._readline(probe_id) != "": pass @base.base_command def _measure_data(self, dt, samples): """Trigger measurement (to SD card) with given configuration for each probe and wait for all to finish. Estimates sync delays between probes. Returns timestamp of first probe and numpy array of relative delays. :param dt int: time between samples in milliseconds (integer) :param samples int: number of samples """ # Set dt between samples, skip 1 response for probe_id in self.probe_ids: self._writeline(probe_id, "V{:d}".format(dt)) self._readline(probe_id) # response: "set dt to 123 ms" # Discharge (reset) integrators, skip 1 response for probe_id in self.probe_ids: self._writeline(probe_id, "R") self._readline(probe_id) # response: "discharged" # Run measurements, make note of start-time differences t0_delays = np.zeros(len(self.probe_ids)) for i, probe_id in enumerate(self.probe_ids): self._writeline(probe_id, "W{:d}".format(samples)) t0_delays[i] = time.perf_counter() t0 = time.time() # Calculate relative delays and start-time of first probe t0_delays -= t0_delays[0] t0 = t0 - t0_delays[-1] self.log.debug("t0 %.6f s, last delay %.3f ms", t0, t0_delays[-1] * 1e3) # Skip responses for i, probe_id in enumerate(self.probe_ids): self._readline(probe_id) self._readline(probe_id) # Wait for all measurements to finish self._last["measuretime_total"] = (dt * 1e-3) * (samples + 1) for probe_id in self.probe_ids: recv = "" while recv != "end of measurement": self._last["measuretime_now"] = time.time() - t0 recv = self._readline(probe_id) # New measurement finished, update sample size of current data self._last["samples_total"] = samples self._last["samples_fetched"] = 0 return t0, t0_delays @base.compound_command def _fetch_last_data(self): """Retrieve most recent data file and return it as numpy array. Columns: timestamp [us], integrated [V?], raw [V?] """ for pid in self.probe_ids: self._writeline(pid, "X-1") self._readline(pid) # skip filename # Note: Method _measure_data() ensures same number of samples per probe. # Fetch data from probes in parallel contents = dict.fromkeys(self.probe_ids, "") recv = {pid: self._readline(pid) for pid in self.probe_ids} while recv[self.probe_ids[0]] != "end of file": for pid in self.probe_ids: contents[pid] += recv[pid] + "," recv[pid] = self._readline(pid) self._last["samples_fetched"] += 1 def to_numpy(text_data): return np.fromstring(text_data, dtype=float, sep=',').reshape(-1, 3).T shape = to_numpy(contents[self.probe_ids[0]]).shape columns = [] for pid in self.ALL_IDS: if pid in contents: columns.append(to_numpy(contents[pid])) else: columns.append(np.full(shape, np.nan)) return np.concatenate(columns) @base.base_command def _remove_last_data(self): """Remove file with most recent data from SD card.""" for pid in self.probe_ids: self._writeline(pid, "D-1") self._readline(pid) # skip filename self._readline(pid) # skip "file removed" (or error) # ==== Commands ==== @property def probe_ids(self): return self._probe_ids
[docs] @base.compound_command # So that the plot stops updating in background def view_plot(self, probe_id): if probe_id not in self.probe_ids: raise base.CommandError("Use one of {}".format(self.probe_ids)) self._last["plot_t0"] = time.time() self._last["plot_probe"] = self.probe_ids.index(probe_id) self._last["plot_data"] = [[], [], []] self._last["active_plot"] = True
[docs] def stop_plot(self): self._last["active_plot"] = False
[docs] @base.compound_command def measure(self, device_client, duration, dt): """Trigger a short synchronized measurement using all probes. Data is saved to file and emitted to server. - Estimated error in time-sync between probes = 0.5ms - Time resolution ~ 0.1ms - Signal resolution ~ 0.0006 :param float duration: total duration of measurement [s], note that there is an upper limit 20_000 samples :param float dt: time between two samples [s], minimal is 1ms """ duration = abs(float(duration)) dt = max(1, int(float(dt) * 1000)) samples = int(duration / dt * 1000) if samples > 20_000: raise base.CommandError("Max 20,000 samples (requested {:,})".format(samples)) if samples <= 0: raise base.CommandError("Duration must be greater than dt") self._last["samples_fetched"] = 0 self._last["active_measurement"] = True self.log.info("Measuring {:,} samples, dt = {:.3f} s".format(samples, dt * 1e-3)) t0, t0_delays = self._measure_data(dt, samples) data = self._fetch_last_data() # Convert micros -> seconds, use measured delays to sync probes for i, delay in enumerate(t0_delays): data[3*i] = (data[3*i] * 1e-6) + delay # Prepare header, filepath and save header = "timestamp {:.4f}\n".format(t0) header += ", ".join("time_{0}[s], integ_{0}, raw_{0}".format(probe_id) for probe_id in self.probe_ids) filepath = "magnets_probes_{}_{{}}.txt".format(time.strftime("%Y-%m-%d_%H-%M")) suffix = 1 while (self._dir_saved_data / filepath.format(suffix)).exists(): suffix += 1 filepath = self._dir_saved_data / filepath.format(suffix) np.savetxt(str(filepath), data.T, fmt="%.4f", header=header) # Clean-up SD cards (at this point the data is safely saved in local file) self._remove_last_data() # Emit to server if device_client is not None: with filepath.open("r") as file: device_client.emit_datafile(file, filepath.name) # Plot data preview def get_graph_data(idx, data): thin = 1 + data.shape[1]//500 # downsample to 250-500 values return data[3*idx : 3*idx+3, ::thin].tolist() self._last["plot_data"] = get_graph_data(self._last["plot_probe"], data) self._emit_graphs(device_client, note="[data preview]") self.stop_plot()
[docs] @base.base_command def measure_single(self): """Measure one sample on each probe. Returns numpy array containing integrated and raw value for each probe, shape=(8, 2). Order of the probes corresponds to the property ``probe_ids``. .. note:: This method is useful mainly for realtime plotting and does not guarantee best precision (time-sync, sampling rate). See measure(). """ data = np.full((len(self.probe_ids), 2), fill_value=np.nan, dtype=float) for i, probe_id in enumerate(self.probe_ids): self._writeline(probe_id, "Z") data[i, :] = np.fromstring(self._readline(probe_id), dtype=float, sep=' ') return np.array(data)
# ==== Methods/commands related to DeviceClient ==== @base.idle_command(busy_return=False) def _idle_update_plotdata(self, probe_index): if not self._last["active_plot"]: return False integ, raw = self.measure_single()[probe_index] self._last["plot_data"][0].append(time.time() - self._last["plot_t0"]) self._last["plot_data"][1].append(integ) self._last["plot_data"][2].append(raw) if len(self._last["plot_data"][0]) > 100: self._last["plot_data"][0].pop(0) self._last["plot_data"][1].pop(0) self._last["plot_data"][2].pop(0) return True def _emit_graphs(self, device_client, note=""): probe_id = self.probe_ids[self._last["plot_probe"]] note = " " + note if note != "" else "" device_client.emit("graph", { "title": "Probe {} integrated".format(probe_id) + note, "x": self._last["plot_data"][0], "y": self._last["plot_data"][1], "xlabel": "t [s]", "ylabel": "[V?]", "id": "graph_integrated" }) device_client.emit("graph", { "title": "Probe {} raw".format(probe_id) + note, "x": self._last["plot_data"][0], "y": self._last["plot_data"][2], "xlabel": "t [s]", "ylabel": "[V?]", "id": "graph_raw" })
[docs] def update_frontend(self, device_client): # First time only - init the element on frontend if self._last["samples_total"] == 0: self._last["samples_total"] = 1 device_client.emit("value", {"label": "Status", "id": "progress", "formatted": "00:00", "value": 0, "min": 0, "max": 1}) if self._last["active_measurement"]: if self._last["samples_fetched"] == 0: value = self._last["measuretime_now"] device_client.emit("value", { "value": 0.5 * value, "formatted": time.strftime("%M:%S", time.gmtime(value)), "label": "Status", "min": 0, "max": self._last["measuretime_total"], "id": "progress", }) else: value = (100 * self._last["samples_fetched"]) // self._last["samples_total"] device_client.emit("value", { "value": 50 + 0.5 * value, "formatted": "data {:.0f}%".format(value), "label": "Status", "min": 0, "max": 100, "id": "progress", }) if value == 100: self._last["active_measurement"] = False if self._idle_update_plotdata(self._last["plot_probe"]): self._emit_graphs(device_client)