import serial
import time
from . import base
[docs]
class PaschenMotor(base.HardwareBase):
"""Motor that controls the distance of electrodes (Paschen experiment).
Model name: Festo EMCA-EC-67-M-1TE-CO
Communication:
- CAN bus, using FHPP protocol (see documentation file EMCA-EC-C-HP.pdf)
- CAN bus connects to Arduino that connects to RaspberryPi via serial (USB)
Note: See 'hardware/arduino_motor_emca/' for Arduino sourcecode.
"""
def __init__(self, serialbus, nodeid, **kwargs):
"""Constructor.
:param str serialbus: Name of serial interface to which Arduino is
connected (e.g. "/dev/ttyUSB0").
:param int nodeid: CANopen node id of EMCA, can be viewed and changed
in Festo Configuration Tool. Default is 0x01.
"""
super().__init__(**kwargs)
self._arduino = None
self._arduino_config = {'serialbus': serialbus, 'nodeid': nodeid}
self._bits = self._init_bits()
self._exponent = None
self._limits = None
self._limits_mm = None
self._last = {
"distance": 0,
"valve_open": False,
"relay_hvps_on": False,
"relay_motor_on": False,
}
@staticmethod
def _init_bits():
"""Init name mapping of main Control and Status bytes."""
bit_names = {
"ccon": ["ENABLE", "STOP", "BRAKE", "RESET", "-", "LOCK", "OPM1", "OPM2"],
"cpos": ["HALT", "START", "HOM", "JOGP", "JOGN", "TEACH", "CLEAR", "-"],
"cdir": ["ABS", "COM1", "COM2", "-", "-", "XLIM", "-", "-"],
"scon": ["ENABLED", "OPEN", "WARN", "FAULT", "VLOAD", "FCT/MMI", "OPM1", "OPM2"],
"spos": ["HALT", "ACK", "MC", "TEACH", "MOV", "FOLERR", "STILL", "REF"],
"sdir": ["ABS", "COM1", "COM2", "-", "VLIM", "XLIM", "-", "-"],
}
return {
byte: {bit: 0b1 << i for i, bit in enumerate(bits) if bit != '-'}
for byte, bits in bit_names.items()
}
# ==== Inherited abstract methods ====
def _connect(self):
self._arduino = Arduino(**self._arduino_config)
self._arduino.toggle_relay_state("RELAY_24_MOTOR") # Power on motor
# TODO Add some time delay, before motor powers up?
self._arduino.start()
try:
# Read settings about units (SINC <-> m) and software limits
self._exponent = int.from_bytes(self._arduino.sdo_read(0x2258),
"little", signed=True)
self._limits = (
int.from_bytes(self._arduino.sdo_read(0x21f5, 1),
"little", signed=True),
int.from_bytes(self._arduino.sdo_read(0x21f5, 2),
"little", signed=True),
)
self._limits_mm = tuple(-1 * a * 10**(3+self._exponent)
for a in self._limits)
# Power-on -> Enabled -> Ready
self._set_cbyte("ENABLE", True, feedback="ENABLED")
self._set_cbyte("STOP", True, feedback="OPEN")
# Direct operation
self._set_cbyte("OPM2", False, feedback="OPM2")
self._set_cbyte("OPM1", True, feedback="OPM1")
# EMCA has not responded with desired status before timeout
except base.TransmissionError:
self._disconnect()
return False
else:
return True
def _disconnect(self):
try:
# Move to 5mm, so that next homing does not take too long
self.set_distance(5)
except base.DeviceError:
pass
try:
self._set_cbyte("ENABLE", False, feedback="ENABLED")
except base.TransmissionError:
pass
try:
self._arduino.stop()
except base.TransmissionError:
pass
self._arduino = None
def _is_ready(self):
try:
self._arduino.sdo_read(0x2431) # Read something (DC voltage)
except base.TransmissionError:
return False
else:
return True
def _safestate(self):
"""Note that purpose of this safestate is to
1. stop moving if moving
2. make sure drive and operation are enabled (e.g. motor is ready)
"""
# This line is important in case of KeyboardInterrupt / stop() call
self._arduino._serial.reset_input_buffer()
# Quick deceleration
self._set_cbyte("STOP", False, feedback="OPEN")
# Try to acknowledge fault (may raise exception)
# Info: Acknowledgement failed -> safstate failed -> disconnect.
# Then on connect the device is restarted, possibly finally
# clearing the fault.
if self._sbyte("FAULT"):
self._set_cbyte("RESET", True)
self._waitfor_sbyte("FAULT", False, timeout=1)
# Try to return to ready state
self._set_cbyte("ENABLE", True, feedback="ENABLED")
self._set_cbyte("STOP", True, feedback="OPEN")
def _readerror(self):
# NOTE Consider if errors can be read directly from EMCA using 0x1???
# CANopen objects. EMCA-EC-C-HP.pdf page 252
if self._sbyte("FAULT"):
emcy = self._arduino.emcy_readcodes()
if emcy:
return "EMCY: " + ", ".join(emcy)
else:
return "Fault detected but no EMCY received"
else:
return None
# def _defaultstate(self):
# NOTE Anything that could be here should probably be set via FCT.
# self.set_homing("current")
# Software limits -> (-3000, 97000) [default]
# data = (-3000).to_bytes(4, "little", signed=True)
# self._arduino.sdo_write(data, 0x21f5, 1)
# data = (97000).to_bytes(4, "little", signed=True)
# self._arduino.sdo_write(data, 0x21f5, 2)
# Project axis zero -> 0 [default]
# data = (0).to_bytes(4, "little", signed=True)
# self._arduino.sdo_write(data, 0x21f4)
# ==== Private commands ====
@base.base_command
def _sbyte(self, which):
"""Get state of specified bit 'which' in one of the status bytes SCON,
SPOS or SDIR.
"""
state = self._arduino.fhpp_state()
for i, byte in enumerate(["scon", "spos", "sdir"]):
if which in self._bits[byte]:
return (state[i] & self._bits[byte][which]) != 0
else:
raise KeyError("Unknown status bit '{}'".format(which))
@base.compound_command # Compound so that it can be interrupted
def _waitfor_sbyte(self, which, b, timestep=10e-3, timeout=1, on_loop=None):
"""Wait for specified bit 'which' in one of the status bytes SCON,
SPOS or SDIR to change to value 'b'=True/False.
"""
if timeout is None:
while self._sbyte(which) != bool(b):
if on_loop is not None:
on_loop()
time.sleep(timestep)
else:
for _ in range(int(timeout/timestep)):
if self._sbyte(which) == bool(b):
break
if on_loop is not None:
on_loop()
time.sleep(timestep)
else:
raise base.TransmissionError("Timed out while waiting for status")
@base.base_command
def _set_cbyte(self, which, b, data1=0, data2=0, feedback=None):
"""Set a specified bit 'which' in one of the control bytes CCON, CPOS,
or CDIR to value 'b'=True/False.
If 'feedback' bit is set, method will block until the specified
feedback bit in one of the status bytes has also value 'b'.
"""
def change_bit(byte, bitmask):
return (byte | bitmask) if bool(b) else (byte & (~bitmask))
# Some status bits (but not all!) represent a feedback on current
# state of some control values. Read them to acquire current state.
state = self._arduino.fhpp_state()
control = [
state[0] & 0b11100011,
state[1] & 0b00000001,
state[2] & 0b00000111,
]
# Modify current state
for i, byte in enumerate(["ccon", "cpos", "cdir"]):
if which in self._bits[byte]:
control[i] = change_bit(control[i], self._bits[byte][which])
break
else:
raise KeyError("Unknown control bit '{}'".format(which))
self._arduino.fhpp_send(*control, data1, data2)
if feedback is not None:
self._waitfor_sbyte(feedback, b)
@base.base_command
def _read_relay_states(self):
"""Read state of power relays and store it to ``self._last``."""
states = self._arduino.get_relay_states()
self._last["valve_open"] = states["RELAY_24_VALVE"]
self._last["relay_motor_on"] = states["RELAY_24_MOTOR"]
self._last["relay_hvps_on"] = states["RELAY_230"]
# ==== Commands ====
# NOTE: This should be now handled by _safestate(). Also it would not work
# because of _readerror().
# @base.base_command
# def reset_error(self):
# if self._sbyte("FAULT"):
# self._set_cbyte("RESET", True)
# try:
# self._waitfor_sbyte("FAULT", False, timeout=1)
# except base.TransmissionError:
# return
# self._set_cbyte("ENABLE", True, feedback="ENABLED")
# self._set_cbyte("STOP", True, feedback="OPEN")
# NOTE: This is probably not needed anymore, it should be set in FCT.
# @base.base_command
# def set_homing(self, method, speed=None, maxtorque=None, stoptime=None):
# self._set_cbyte("STOP", False, feedback="OPEN")
# self._set_cbyte("ENABLE", False, feedback="ENABLED")
# # Offset axis zero -> 0
# data = (0).to_bytes(4, "little", signed=True)
# self._arduino.sdo_write(data, 0x23f2)
# # Homing method -> ?
# data = {
# "current": b"\xdd",
# "stop_positive": b"\xee",
# "stop_negative": b"\xef",
# }[method]
# self._arduino.sdo_write(data, 0x23f3)
# # Search speed [SINC/s] -> ?
# if maxtorque is not None:
# data = maxtorque.to_bytes(4, "little", signed=True)
# self._arduino.sdo_write(data, 0x23f4, 1)
# # Max torque for stop method [per mille] -> ?
# if maxtorque is not None:
# data = maxtorque.to_bytes(2, "little", signed=True)
# self._arduino.sdo_write(data, 0x23f7)
# # Stop time for stop method [ms] -> ?
# if stoptime is not None:
# data = stoptime.to_bytes(2, "little", signed=False)
# self._arduino.sdo_write(data, 0x23f9)
# self._set_cbyte("ENABLE", True, feedback="ENABLED")
# self._set_cbyte("STOP", True, feedback="OPEN")
[docs]
@base.compound_command
def homing(self):
"""Execute homing procedure (calibration)."""
if not self._sbyte("MC"):
raise base.CommandError("Another task is being executed")
self._set_cbyte("HALT", True, feedback="HALT")
self._set_cbyte("HOM", True, feedback="ACK")
self._set_cbyte("HOM", False)
self._waitfor_sbyte("MC", True, timestep=100e-3, timeout=None)
[docs]
def limits(self):
"""Return limits on distance of electrodes [mm]."""
return self._limits_mm
[docs]
@base.base_command
def distance(self):
"""Return distance of electrodes [mm]."""
self._last["distance"] = -1 * self._arduino.fhpp_state()[4] * 10**(3+self._exponent)
return self._last["distance"]
[docs]
@base.compound_command
def set_distance(self, distance):
"""Set new distance of electrodes [mm].
:param float distance: value in range cca 0.5mm-90mm.
"""
if not self._sbyte("REF"):
raise base.CommandError("Homing required")
if not self._sbyte("MC"):
raise base.CommandError("Another task is being executed")
# NOTE: There is a -1 to reverse the direction. Consider changing
# PNU 1000 setting "Reversal of direction / Polarity" in FCT.
# Also there is a -1 in distance() and limits() !!!
sinc = -1 * int(round(float(distance) * 10**(-3-self._exponent)))
if not (self._limits[0] < sinc < self._limits[1]):
raise base.CommandError("Out of bounds {:.2f}-{:.2f}mm".format(*self.limits()))
self._set_cbyte("HALT", True, feedback="HALT")
self._set_cbyte("START", True, 0xff, sinc, feedback="ACK")
self._set_cbyte("START", False)
# Wait until the task is finished
self._waitfor_sbyte("MC", True, timestep=100e-3, timeout=None,
on_loop=self._idle_update_all)
[docs]
@base.base_command
def is_relay_on(self):
"""Return state of power relay of the HV power suply device."""
self._read_relay_states()
return self._last["relay_hvps_on"]
[docs]
@base.base_command
def switch_relay_on(self):
"""Switch on power for the HV power supply device."""
if not self.is_relay_on():
self._arduino.toggle_relay_state("RELAY_230")
[docs]
@base.base_command
def switch_relay_off(self):
"""Switch off power for the HV power supply device."""
if self.is_relay_on():
self._arduino.toggle_relay_state("RELAY_230")
[docs]
@base.base_command
def is_valve_open(self):
"""Return the state of main chamber valve."""
self._read_relay_states()
return self._last["valve_open"]
[docs]
@base.base_command
def open_valve(self):
"""Open main chamber valve."""
if not self.is_valve_open():
self._arduino.toggle_relay_state("RELAY_24_VALVE")
[docs]
@base.base_command
def close_valve(self):
"""Close main chamber valve."""
if self.is_valve_open():
self._arduino.toggle_relay_state("RELAY_24_VALVE")
[docs]
@base.compound_command
def pingpong(self):
for i in range(10, 50, 5):
self.set_distance(5)
self.set_distance(i)
# ==== Methods/commands related to DeviceClient ====
@base.idle_command
def _idle_update_all(self):
# These methods update self._last
self.distance()
self._read_relay_states()
[docs]
def update_frontend(self, device_client):
self._idle_update_all() # Update states if device not busy
value = self._last["distance"] # mm
device_client.emit("value", {
"value": value,
"formatted": "{:.2f} mm".format(value),
"label": "Distance",
"min": min(*self.limits()),
"max": max(*self.limits()),
"id": "electrode_distance"
})
state = self._last["valve_open"]
device_client.emit("value", {
"value": 1 if state else 0,
"formatted": "open" if state else "closed",
"label": "Valve",
"min": 0,
"max": 1,
"id": "state_relay_valve"
})
state = self._last["relay_hvps_on"]
device_client.emit("value", {
"value": 1 if state else 0,
"formatted": "on" if state else "off",
"label": "HV pwr supply",
"min": 0,
"max": 1,
"id": "state_relay_hvps"
})
[docs]
class Arduino:
"""Internal helper class for communication with Arduino.
Note: See 'hardware/arduino_motor_emca/' for Arduino sourcecode.
"""
def __init__(self, serialbus, nodeid):
self.nodeid = nodeid # CAN node id, number, 7 bits = 0x01-0x7f
def reset_arduino(ardu):
ardu.setDTR(False)
time.sleep(0.1)
ardu.reset_input_buffer()
ardu.setDTR(True)
# This line raises serial.SerialException if ``serialbus`` is not valid
self._serial = serial.Serial(serialbus, 9600, timeout=2)
reset_arduino(self._serial)
time.sleep(5)
def _parseline(self):
"""Read and parse line from serial according to common format of
Arduino responses.
"""
recv = self._serial.readline()
if len(recv) == 0 or recv[-1] != ord("\n"):
raise base.TransmissionError("Arduino not responding")
recv = recv.strip().decode("ascii")
return recv[0], recv[1:]
def _getpackets(self, ptype):
"""Get all packets of specified type buffered in Arduino. Note that
for different types buffers have different size and behaviour.
:param str ptype: Packet types: TPDO1, EMCY, SDO. Additionally, other
packets can be retrieved with ``ptype=OTHER``.
Returns a list of tuples (packet type [str], packet data [bytes]).
"""
# FUNCTION NAME (= packet type): See EMCA-EC-C-HP.pdf page 234
ptype_map = {
0b1110: "NMT", # Network management
0b0001: "EMCY", # Emergency object
0b0011: "TPDO1", # FHPP
0b0101: "TPDO2", # FCT
0b0111: "TPDO3", # FHPP+
0b1001: "TPDO4", # FHPP+
0b1011: "SDO", # Service data object
}
def parsepacket():
recv = self._serial.readline().decode("ascii")
print("r: " + recv.strip())
# Parsing format "iii:dddd...\r\n" where iii and dd are hex numbers
packetid = int(recv[:3], 16)
data = bytes.fromhex(recv[4:])
# - FUNCTION CODE: First 4bits of 11bit packetid
ptype = ptype_map[packetid >> 7]
# - NODE ID: See EMCA-EC-C-HP.pdf page 234, last 7bits of packetid
# nodeid = packetid & 0b00001111111
return ptype, data
self._serial.write("?{}\n".format(ptype).encode("ascii"))
code, count = self._parseline()
if code != "?":
raise base.TransmissionError("Arduino: " + code + count)
packets = []
for _ in range(int(count)):
packets.append(parsepacket())
return packets
def _sendpacket(self, ptype, data):
"""Use serial connection with Arduino to send a CAN packet.
:param str ptype: type of packet e.g. 'NMT', 'RPDO1', ... see EMCA docs
EMCA-EC-C-HP.pdf page 234
:param bytes data: content of the packet (0-64 bytes)
"""
# FUNCTION CODE (or packet type): See EMCA-EC-C-HP.pdf page 234
funccode = {
"NMT": 0b0000,
"SYNC": 0b0001,
"RPDO1": 0b0100,
"RPDO2": 0b0110,
"RPDO3": 0b1000,
"RPDO4": 0b1010,
"SDO": 0b1100,
}[ptype]
packetid = funccode << 7
if ptype not in ["NMT", "SYNC"]:
packetid |= self.nodeid
msg = "@{:03x}:{}\n".format(packetid, data.hex())
print("t: " + msg.strip())
self._serial.write(msg.encode("ascii"))
code, msg = self._parseline()
if (code, msg) != ("@", "OK"):
raise base.TransmissionError("Arduino: " + code + msg)
[docs]
def start(self):
"""Bring Arduino and EMCA into an active state."""
self._serial.write(b">START\n")
time.sleep(3) # EMCA is restarting and it takes time
code, msg = self._parseline()
if (code, msg) == (">", "OK"):
return
elif (code, msg) == ("!", "Timed out"):
raise base.TransmissionError("EMCA motor not responding")
else:
raise base.TransmissionError("Arduino: " + code + msg)
[docs]
def stop(self):
"""Stop EMCA and Arduino."""
self._serial.write(b">STOP\n")
code, msg = self._parseline()
if (code, msg) != (">", "OK"):
raise base.TransmissionError("Arduino: " + code + msg)
[docs]
def toggle_relay_state(self, which):
"""Toggle state of the relays connected to Arduino. All relays are
initially switched off.
:param str which: one of ``RELAY_24_VALVE``, ``RELAY_24_MOTOR``,
``RELAY_230``
"""
assert which in {"RELAY_24_VALVE", "RELAY_24_MOTOR", "RELAY_230"}
self._serial.write(">TOGGLE_{}\n".format(which).encode("ascii"))
code, msg = self._parseline()
if (code, msg) != (">", "OK"):
raise base.TransmissionError("Arduino: " + code + msg)
[docs]
def get_relay_states(self):
"""Get current state of all three relays in a dict. The keys of the
returned dict correspond to parameter which in ``toggle_relay_state()``.
"""
self._serial.write(b">GET_RELAY_STATES\n")
states = {}
for _ in range(3):
which, value = "".join(self._parseline()).split()
states[which] = (value == "1")
if not all(k in states for k in {"RELAY_24_VALVE", "RELAY_24_MOTOR", "RELAY_230"}):
raise base.TransmissionError("Failed to acquire all relay states")
code, msg = self._parseline()
if (code, msg) != (">", "OK"):
raise base.TransmissionError("Arduino: " + code + msg)
return states
[docs]
def sdo_read(self, index, subindex=1):
"""Execute SDO read transaction.
:return bytes data: little-endian representation of 1, 2, or 4 bytes
"""
msg = b"\x40"
msg += index.to_bytes(2, "little") + subindex.to_bytes(1, "little")
msg += b"\x00\x00\x00\x00"
self._sendpacket("SDO", msg)
time.sleep(10e-3) # Give EMCA time (10ms) to respond to Arduino
_, msg = self._getpackets("SDO")[0]
if msg[0] == 0x4f:
return msg[4:5]
elif msg[0] == 0x4b:
return msg[4:6]
elif msg[0] == 0x43:
return msg[4:]
else: # msg[0] == 0x80 signalizes SDO error, msg[4:] is error code
raise base.TransmissionError("Bad SDO transaction "+msg[4:].hex())
[docs]
def sdo_write(self, data, index, subindex=1):
"""Execute SDO write transaction.
:param bytes data: little-endian representation of 1, 2, or 4 bytes
"""
dlen = len(data)
msg = {1: b"\x2f", 2: b"\x2b", 4: b"\x23"}[dlen]
msg += index.to_bytes(2, "little") + subindex.to_bytes(1, "little")
msg += data + (4-dlen) * b"\x00"
self._sendpacket("SDO", msg)
time.sleep(10e-3) # Give EMCA time (10ms) to respond to Arduino
_, msg = self._getpackets("SDO")[0]
if msg[0] == 0x60:
return
else: # msg[0] == 0x80 signalizes SDO error, msg[4:] is error code
raise base.TransmissionError("Bad SDO transaction "+msg[4:].hex())
[docs]
def fhpp_state(self, actual2signed=True):
"""Get most recent FHPP packet (type TPDO1) received from EMCA,
see EMCA-EC-C-HP.pdf page 54.
Returns list of [SCON, SPOS, SDIR, actual1, actual2].
Note: actual2 is by default interpretted as signed int, actual1 is
always unsigned.
"""
_, msg = self._getpackets("TPDO1")[0]
res = [msg[i] for i in range(4)]
return res + [int.from_bytes(msg[4:], "little", signed=actual2signed)]
[docs]
def fhpp_send(self, ccon, cpos, cdir, data1=0, data2=0, data2signed=True):
"""Send basic FHPP packet (type RPDO1), see EMCA-EC-C-HP.pdf page 50.
Packet structure:
ccon, cpos, cdit, data1 - each size 1 byte, usigned int
data2 - size 4 bytes, signed int (by default)
"""
msg = b""
for a in [ccon, cpos, cdir, data1]:
msg += a.to_bytes(1, "little")
msg += data2.to_bytes(4, "little", signed=data2signed)
self._sendpacket("RPDO1", msg)
[docs]
def emcy_readcodes(self):
"""Parse all buffered EMCY packets and return them as a list of 2-byte
errorcodes in string format '0x0123'. EMCA-EC-C-HP.pdf page 255
"""
return ["0x{:04x}".format(int.from_bytes(data[0:2], "little"))
for _, data in self._getpackets("EMCY")]
[docs]
def other_readall(self):
"""Return a list of all buffered uncathegorized packets. Packets are
represented as tuples (packet type [str], packet data [bytes]).
"""
return self._getpackets("OTHER")