Source code for device.base.inomotorbase

import serial

import os
import glob
import time
import configparser

from . import base


[docs] class ArduinoMotorsBase(base.HardwareBase): """Base class for ...""" def __init__(self, calibfile, buses=None, limits_enabled=True, **kwargs): super().__init__(**kwargs) self._calib = MotorsCalibration(calibfile, limits_enabled) # Automatically search for USB serial devices if none were specified if buses is None: buses = glob.glob("/dev/ttyUSB*") if len(buses) != len(self.motor_ids): msg = "Expected {} usb serial ports, got {} instead" raise RuntimeError(msg.format(len(self.motor_ids), len(buses))) self._buses = tuple(buses) self._ardu = {} self._position = {} self._homed = False # ==== Inherited abstract methods ==== def _connect(self): def cleanup_and_log(msg, *args): self.log.debug(msg, *args) for ardu in temp.values(): ardu.close() self._ardu.clear() def reset(ardu): ardu.dtr = False time.sleep(0.1) ardu.reset_input_buffer() ardu.dtr = True temp = {} for bus in self._buses: try: ardu = serial.Serial(bus, 9600) # ardu = TestSerial(bus, 9600) except serial.SerialException: cleanup_and_log("Failed to open serial '%s'", bus) return False # Reset arduino and flush input buffer # (it is not always reseted on port open) reset(ardu) temp[bus] = ardu # Split into second cycle so that arduinos restart in parallel for bus, ardu in temp.items(): # Wait for hello meassage == "Start\r\n" ardu.timeout = 5 recv = ardu.readline().strip().decode("ascii") ardu.timeout = 0.5 # TODO How long timeout? Often waiting 1 full timeout at the end of _move() if (recv == "Start"): # Read ID message from next line e.g. "R\r\n" motor_id = ardu.readline().strip().decode("ascii") elif (recv.startswith("Start")): motor_id = recv.partition(" ")[-1] else: cleanup_and_log("Unexpected start message '%s' from" " arduino at %s", recv, bus) return False self._ardu[motor_id] = ardu if motor_id not in self.motor_ids: cleanup_and_log("Detected motor id '%s' at '%s' which is not" " listed in calibration file", bus, motor_id) return False # Third message is current position, _read_serial parses it self._read_serial(motor_id) self.log.debug("Arduino id=%s initialized", motor_id) # Fourth message == "L free\r\n", discard self._read_serial(motor_id) self._homed = False return True def _disconnect(self): for key in self._ardu: self._ardu[key].write(b"n") # Immediate stop self._ardu[key].flush() self._ardu[key].close() self._ardu.clear() def _is_ready(self): return True def _safestate(self): """ 1. Stop immediately all motors. 2. Move any motors that are outside of software limits. """ for ardu in self._ardu.values(): ardu.write(b"n") # Immediate stop | send in parallel for key in self._ardu: # Wait for responses "STOP \r\n" or timeout ("" is returned) while self._read_serial(key) not in ["STOP", ""]: pass if self.is_homed(): self._moveto_limits(key) # ==== Private commands ==== @base.base_command def _read_serial(self, motor_id): """Read line from serial input and return it. Additionally, when line contains position in format 'Position: 12.3 mm', it is extracted and saved to ``self._position``. """ line = self._ardu[motor_id].readline().strip().decode("ascii") if line.startswith("Position: "): pos = self._calib.apply(motor_id, float(line.split(" ")[1])) self._position[motor_id] = pos return line @base.base_command def _clear_serial_queue(self, motor_id): """Read lines from serial input until no input is availible.""" while self._read_serial(motor_id) != "": pass @base.compound_command def _move(self, motor_id, distance, check_limits=True): """Move ``distance`` [mm] from current position. Use + and - values to specify direction. """ distance = self._calib.apply_movedist(motor_id, distance) distance *= 100 # units of "i" and "o" are [1e-2 mm] self._clear_serial_queue(motor_id) if distance > 0: self._ardu[motor_id].write(b"i") # Initiate movement in else: self._ardu[motor_id].write(b"o") # Initiate movement out self._ardu[motor_id].write("{:.0f}\n".format(abs(distance)).encode("ascii")) self._read_serial(motor_id) # response: "1234CLOSE " or OPEN -> discard endstop_msg = ["OUT reached", "IN reached"] endstop_hit = False while True: resp = self._read_serial(motor_id) if check_limits: if not self._calib.check_limits(motor_id, self.position(motor_id)): # This terminates command and triggers safestate (motion stops) raise base.CommandError("Software limit reached") # a) Movement finished if resp.startswith("POS: "): self._read_serial(motor_id) # Repeated response -> discard # a.1) If endstop has been hit at the same moment, make a note. if self._read_serial(motor_id) in endstop_msg: endstop_hit = True break # b) Endstop hit, make a note. The a) case will follow right after this. elif resp in endstop_msg: endstop_hit = True # Wait for "reverse from endstop" movement to finish if endstop_hit: while not self._read_serial(motor_id).startswith("POS: "): pass self._read_serial(motor_id) # Repeated response -> discard @base.base_command def _moveto_limits(self, motor_id): """If outside of software limits, move inside.""" if not self._calib.check_limits(motor_id, self.position(motor_id)): pos = self.position(motor_id) nearest = self._calib.nearest_in_limits(motor_id, pos) self._move(motor_id, nearest - pos, check_limits=False) # ==== Commands ==== @property def motor_ids(self): return self._calib.motor_ids
[docs] @base.base_command def position(self, motor_id): """Return position of a specified motor [mm]. :param str motor_id: Id of motor, see property ``motor_ids``. """ if motor_id not in self.motor_ids: raise base.CommandError("Motor id must be one of {}".format(self.motor_ids)) return self._position[motor_id]
[docs] def limits(self, motor_id): """Return position limits of specified motor [mm]. :param str motor_id: Id of motor, see property ``motor_ids``. """ if motor_id not in self.motor_ids: raise base.CommandError("Motor id must be one of {}".format(self.motor_ids)) return self._calib.limits(motor_id)
[docs] def is_homed(self): """Homing process executed?""" return self._homed
[docs] @base.compound_command def homing(self): """Execute homing procedure for each of the motors.""" # TODO Consider running this in parallel - could 4 motors at once draw too much current? for key in self._ardu: self._ardu[key].write(b"h") # Initiate homing while self._read_serial(key) != "HOMING COMPLETE": pass self._read_serial(key) # "OUT free" self._read_serial(key) # Current position for key in self._ardu: self._moveto_limits(key) self._homed = True self.log.info("Homing complete!")
[docs] @base.compound_command def moveto(self, motor_id, position): """Move a specified motor to new position [mm]. :param str motor_id: Id of motor, see property ``motor_ids``. :param float position: New position in mm. """ if motor_id not in self.motor_ids: raise base.CommandError("Motor id must be one of {}".format(self.motor_ids)) if not self.is_homed(): raise base.CommandError("Homing required") if not self._calib.check_limits(motor_id, float(position)): raise base.CommandError("Software limit " + str(self.limits(motor_id))+" mm") dist = float(position) - self.position(motor_id) self._move(motor_id, dist, check_limits=True)
# ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idleupdate_position(self, motor_id): self._clear_serial_queue(motor_id) # Read all including the position messages
[docs] def update_frontend(self, device_client): # Not an idle command, it does not communicate with hardware # => frontend can be updated even when a command is running for key in self._ardu: self._idleupdate_position(key) value = self._position[key] device_client.emit("value", { "value": value, "formatted": "{:.2f} mm".format(value), "label": "Pos "+key, "min": min(self.limits(key)), "max": max(self.limits(key)), "id": "position_"+key })
[docs] class CalibrationFileError(Exception): pass
[docs] class MotorsCalibration: """Helper class that parses calibration file and uses the data. Calibration file - required options and data types unit, id : str invert direction : bool or yes/no offset, limit low, limit high, limits bounce : float """ def __init__(self, filename, limits_enabled=True): self._limits_enabled = limits_enabled path = os.path.join(os.path.dirname(__file__), "..", filename) self._calibdata = self._read_calibfile(path) @staticmethod def _read_calibfile(path): KEYS = { "str": ["unit", "id"], "bool": ["invert"], "float": ["offset", "limit_low", "limit_high", "limits_bounce"], } UNIT_EXP = {"m": 1e3, "dm": 1e2, "cm": 1e1, "mm": 1, "um": 1e-3} config = configparser.ConfigParser() config.read_file(open(path, "r")) calibdata = {} for name in config.sections(): sec = config[name] # Mandatory keys try: parsed = { **{key: sec.get(key) for key in KEYS["str"]}, **{key: sec.getboolean(key) for key in KEYS["bool"]}, **{key: sec.getfloat(key) for key in KEYS["float"]}, } exp = UNIT_EXP[parsed["unit"]] except ValueError: raise CalibrationFileError("Invalid values or datatypes") except KeyError: raise CalibrationFileError("Invalid value for option 'unit'") if None in parsed.values(): raise CalibrationFileError("Missing mandatory keys") for key in KEYS["float"]: parsed[key] *= exp # Optional keys parsed["scale_move"] = sec.getfloat("scale_move", fallback=1.0) parsed["scale_position"] = sec.getfloat("scale_position", fallback=1.0) # Derived parsed["limits"] = (parsed["limit_low"], parsed["limit_high"]) parsed["direction"] = -1 if parsed["invert"] else 1 calibdata[parsed['id']] = parsed return calibdata @property def motor_ids(self): return tuple(self._calibdata.keys())
[docs] def limits(self, motor_id): return self._calibdata[motor_id]["limits"]
[docs] def check_limits(self, motor_id, position): """Returns false if position is not within limits.""" if self._limits_enabled: lims = self.limits(motor_id) return (min(lims) < position < max(lims)) else: return True
[docs] def nearest_in_limits(self, motor_id, position): """Returns nearest position inside limits + bounce distance.""" bounce = self._calibdata[motor_id]["limits_bounce"] bounced_lims = ( min(self.limits(motor_id)) + bounce, max(self.limits(motor_id)) - bounce ) if position < min(bounced_lims): return min(bounced_lims) elif position > max(bounced_lims): return max(bounced_lims) else: return position
[docs] def apply(self, motor_id, position): di, sc_p, of = ( self._calibdata[motor_id][key] for key in ["direction", "scale_position", "offset"] ) return di * sc_p * position + of
[docs] def apply_movedist(self, motor_id, distance): di, sc_m = ( self._calibdata[motor_id][key] for key in ["direction", "scale_move"] ) return di * sc_m * distance
[docs] class TestSerial: def __init__(self, bus, *args, **kwargs): self.bus = bus self.msg_queue = [ "Start\r\n", bus + "\r\n", "Position: 0 mm\r\n", ] self.msg_queue.reverse()
[docs] def write(self, data): print(self.bus, data.decode("ascii"))
[docs] def readline(self): if len(self.msg_queue) > 0: return self.msg_queue.pop().encode("ascii") else: return input("Input: ").encode("ascii")
[docs] def flush(self): print(self.bus, "FLUSH")
[docs] def close(self): print(self.bus, "CLOSE")
[docs] def setDTR(self, b): print(self.bus, "SET DTR", b)
[docs] def reset_input_buffer(self): print(self.bus, "RESET INPUT BUFFER")