import serial
import os
import glob
import time
import configparser
from . import base
[docs]
class ArduinoMotorsBase(base.HardwareBase):
"""Base class for ..."""
def __init__(self, calibfile, buses=None, limits_enabled=True, **kwargs):
super().__init__(**kwargs)
self._calib = MotorsCalibration(calibfile, limits_enabled)
# Automatically search for USB serial devices if none were specified
if buses is None:
buses = glob.glob("/dev/ttyUSB*")
if len(buses) != len(self.motor_ids):
msg = "Expected {} usb serial ports, got {} instead"
raise RuntimeError(msg.format(len(self.motor_ids), len(buses)))
self._buses = tuple(buses)
self._ardu = {}
self._position = {}
self._homed = False
# ==== Inherited abstract methods ====
def _connect(self):
def cleanup_and_log(msg, *args):
self.log.debug(msg, *args)
for ardu in temp.values():
ardu.close()
self._ardu.clear()
def reset(ardu):
ardu.dtr = False
time.sleep(0.1)
ardu.reset_input_buffer()
ardu.dtr = True
temp = {}
for bus in self._buses:
try:
ardu = serial.Serial(bus, 9600)
# ardu = TestSerial(bus, 9600)
except serial.SerialException:
cleanup_and_log("Failed to open serial '%s'", bus)
return False
# Reset arduino and flush input buffer
# (it is not always reseted on port open)
reset(ardu)
temp[bus] = ardu
# Split into second cycle so that arduinos restart in parallel
for bus, ardu in temp.items():
# Wait for hello meassage == "Start\r\n"
ardu.timeout = 5
recv = ardu.readline().strip().decode("ascii")
ardu.timeout = 0.5 # TODO How long timeout? Often waiting 1 full timeout at the end of _move()
if (recv == "Start"):
# Read ID message from next line e.g. "R\r\n"
motor_id = ardu.readline().strip().decode("ascii")
elif (recv.startswith("Start")):
motor_id = recv.partition(" ")[-1]
else:
cleanup_and_log("Unexpected start message '%s' from"
" arduino at %s", recv, bus)
return False
self._ardu[motor_id] = ardu
if motor_id not in self.motor_ids:
cleanup_and_log("Detected motor id '%s' at '%s' which is not"
" listed in calibration file", bus, motor_id)
return False
# Third message is current position, _read_serial parses it
self._read_serial(motor_id)
self.log.debug("Arduino id=%s initialized", motor_id)
# Fourth message == "L free\r\n", discard
self._read_serial(motor_id)
self._homed = False
return True
def _disconnect(self):
for key in self._ardu:
self._ardu[key].write(b"n") # Immediate stop
self._ardu[key].flush()
self._ardu[key].close()
self._ardu.clear()
def _is_ready(self):
return True
def _safestate(self):
"""
1. Stop immediately all motors.
2. Move any motors that are outside of software limits.
"""
for ardu in self._ardu.values():
ardu.write(b"n") # Immediate stop | send in parallel
for key in self._ardu:
# Wait for responses "STOP \r\n" or timeout ("" is returned)
while self._read_serial(key) not in ["STOP", ""]:
pass
if self.is_homed():
self._moveto_limits(key)
# ==== Private commands ====
@base.base_command
def _read_serial(self, motor_id):
"""Read line from serial input and return it. Additionally, when line
contains position in format 'Position: 12.3 mm', it is extracted and
saved to ``self._position``.
"""
line = self._ardu[motor_id].readline().strip().decode("ascii")
if line.startswith("Position: "):
pos = self._calib.apply(motor_id, float(line.split(" ")[1]))
self._position[motor_id] = pos
return line
@base.base_command
def _clear_serial_queue(self, motor_id):
"""Read lines from serial input until no input is availible."""
while self._read_serial(motor_id) != "":
pass
@base.compound_command
def _move(self, motor_id, distance, check_limits=True):
"""Move ``distance`` [mm] from current position. Use + and - values
to specify direction.
"""
distance = self._calib.apply_movedist(motor_id, distance)
distance *= 100 # units of "i" and "o" are [1e-2 mm]
self._clear_serial_queue(motor_id)
if distance > 0:
self._ardu[motor_id].write(b"i") # Initiate movement in
else:
self._ardu[motor_id].write(b"o") # Initiate movement out
self._ardu[motor_id].write("{:.0f}\n".format(abs(distance)).encode("ascii"))
self._read_serial(motor_id) # response: "1234CLOSE " or OPEN -> discard
endstop_msg = ["OUT reached", "IN reached"]
endstop_hit = False
while True:
resp = self._read_serial(motor_id)
if check_limits:
if not self._calib.check_limits(motor_id, self.position(motor_id)):
# This terminates command and triggers safestate (motion stops)
raise base.CommandError("Software limit reached")
# a) Movement finished
if resp.startswith("POS: "):
self._read_serial(motor_id) # Repeated response -> discard
# a.1) If endstop has been hit at the same moment, make a note.
if self._read_serial(motor_id) in endstop_msg:
endstop_hit = True
break
# b) Endstop hit, make a note. The a) case will follow right after this.
elif resp in endstop_msg:
endstop_hit = True
# Wait for "reverse from endstop" movement to finish
if endstop_hit:
while not self._read_serial(motor_id).startswith("POS: "):
pass
self._read_serial(motor_id) # Repeated response -> discard
@base.base_command
def _moveto_limits(self, motor_id):
"""If outside of software limits, move inside."""
if not self._calib.check_limits(motor_id, self.position(motor_id)):
pos = self.position(motor_id)
nearest = self._calib.nearest_in_limits(motor_id, pos)
self._move(motor_id, nearest - pos, check_limits=False)
# ==== Commands ====
@property
def motor_ids(self):
return self._calib.motor_ids
[docs]
@base.base_command
def position(self, motor_id):
"""Return position of a specified motor [mm].
:param str motor_id: Id of motor, see property ``motor_ids``.
"""
if motor_id not in self.motor_ids:
raise base.CommandError("Motor id must be one of {}".format(self.motor_ids))
return self._position[motor_id]
[docs]
def limits(self, motor_id):
"""Return position limits of specified motor [mm].
:param str motor_id: Id of motor, see property ``motor_ids``.
"""
if motor_id not in self.motor_ids:
raise base.CommandError("Motor id must be one of {}".format(self.motor_ids))
return self._calib.limits(motor_id)
[docs]
def is_homed(self):
"""Homing process executed?"""
return self._homed
[docs]
@base.compound_command
def homing(self):
"""Execute homing procedure for each of the motors."""
# TODO Consider running this in parallel - could 4 motors at once draw too much current?
for key in self._ardu:
self._ardu[key].write(b"h") # Initiate homing
while self._read_serial(key) != "HOMING COMPLETE":
pass
self._read_serial(key) # "OUT free"
self._read_serial(key) # Current position
for key in self._ardu:
self._moveto_limits(key)
self._homed = True
self.log.info("Homing complete!")
[docs]
@base.compound_command
def moveto(self, motor_id, position):
"""Move a specified motor to new position [mm].
:param str motor_id: Id of motor, see property ``motor_ids``.
:param float position: New position in mm.
"""
if motor_id not in self.motor_ids:
raise base.CommandError("Motor id must be one of {}".format(self.motor_ids))
if not self.is_homed():
raise base.CommandError("Homing required")
if not self._calib.check_limits(motor_id, float(position)):
raise base.CommandError("Software limit " + str(self.limits(motor_id))+" mm")
dist = float(position) - self.position(motor_id)
self._move(motor_id, dist, check_limits=True)
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idleupdate_position(self, motor_id):
self._clear_serial_queue(motor_id) # Read all including the position messages
[docs]
def update_frontend(self, device_client):
# Not an idle command, it does not communicate with hardware
# => frontend can be updated even when a command is running
for key in self._ardu:
self._idleupdate_position(key)
value = self._position[key]
device_client.emit("value", {
"value": value,
"formatted": "{:.2f} mm".format(value),
"label": "Pos "+key,
"min": min(self.limits(key)),
"max": max(self.limits(key)),
"id": "position_"+key
})
[docs]
class CalibrationFileError(Exception):
pass
[docs]
class MotorsCalibration:
"""Helper class that parses calibration file and uses the data.
Calibration file - required options and data types
unit, id : str
invert direction : bool or yes/no
offset, limit low, limit high, limits bounce : float
"""
def __init__(self, filename, limits_enabled=True):
self._limits_enabled = limits_enabled
path = os.path.join(os.path.dirname(__file__), "..", filename)
self._calibdata = self._read_calibfile(path)
@staticmethod
def _read_calibfile(path):
KEYS = {
"str": ["unit", "id"],
"bool": ["invert"],
"float": ["offset", "limit_low", "limit_high", "limits_bounce"],
}
UNIT_EXP = {"m": 1e3, "dm": 1e2, "cm": 1e1, "mm": 1, "um": 1e-3}
config = configparser.ConfigParser()
config.read_file(open(path, "r"))
calibdata = {}
for name in config.sections():
sec = config[name]
# Mandatory keys
try:
parsed = {
**{key: sec.get(key) for key in KEYS["str"]},
**{key: sec.getboolean(key) for key in KEYS["bool"]},
**{key: sec.getfloat(key) for key in KEYS["float"]},
}
exp = UNIT_EXP[parsed["unit"]]
except ValueError:
raise CalibrationFileError("Invalid values or datatypes")
except KeyError:
raise CalibrationFileError("Invalid value for option 'unit'")
if None in parsed.values():
raise CalibrationFileError("Missing mandatory keys")
for key in KEYS["float"]:
parsed[key] *= exp
# Optional keys
parsed["scale_move"] = sec.getfloat("scale_move", fallback=1.0)
parsed["scale_position"] = sec.getfloat("scale_position", fallback=1.0)
# Derived
parsed["limits"] = (parsed["limit_low"], parsed["limit_high"])
parsed["direction"] = -1 if parsed["invert"] else 1
calibdata[parsed['id']] = parsed
return calibdata
@property
def motor_ids(self):
return tuple(self._calibdata.keys())
[docs]
def limits(self, motor_id):
return self._calibdata[motor_id]["limits"]
[docs]
def check_limits(self, motor_id, position):
"""Returns false if position is not within limits."""
if self._limits_enabled:
lims = self.limits(motor_id)
return (min(lims) < position < max(lims))
else:
return True
[docs]
def nearest_in_limits(self, motor_id, position):
"""Returns nearest position inside limits + bounce distance."""
bounce = self._calibdata[motor_id]["limits_bounce"]
bounced_lims = (
min(self.limits(motor_id)) + bounce,
max(self.limits(motor_id)) - bounce
)
if position < min(bounced_lims):
return min(bounced_lims)
elif position > max(bounced_lims):
return max(bounced_lims)
else:
return position
[docs]
def apply(self, motor_id, position):
di, sc_p, of = (
self._calibdata[motor_id][key] for key in
["direction", "scale_position", "offset"]
)
return di * sc_p * position + of
[docs]
def apply_movedist(self, motor_id, distance):
di, sc_m = (
self._calibdata[motor_id][key] for key in
["direction", "scale_move"]
)
return di * sc_m * distance
[docs]
class TestSerial:
def __init__(self, bus, *args, **kwargs):
self.bus = bus
self.msg_queue = [
"Start\r\n",
bus + "\r\n",
"Position: 0 mm\r\n",
]
self.msg_queue.reverse()
[docs]
def write(self, data):
print(self.bus, data.decode("ascii"))
[docs]
def readline(self):
if len(self.msg_queue) > 0:
return self.msg_queue.pop().encode("ascii")
else:
return input("Input: ").encode("ascii")
[docs]
def flush(self):
print(self.bus, "FLUSH")
[docs]
def close(self):
print(self.bus, "CLOSE")
[docs]
def setDTR(self, b):
print(self.bus, "SET DTR", b)