Source code for device.paschen_motor

import serial

import time

from . import base


[docs] class PaschenMotor(base.HardwareBase): """Motor that controls the distance of electrodes (Paschen experiment). Model name: Festo EMCA-EC-67-M-1TE-CO Communication: - CAN bus, using FHPP protocol (see documentation file EMCA-EC-C-HP.pdf) - CAN bus connects to Arduino that connects to RaspberryPi via serial (USB) Note: See 'hardware/arduino_motor_emca/' for Arduino sourcecode. """ def __init__(self, serialbus, nodeid, **kwargs): """Constructor. :param str serialbus: Name of serial interface to which Arduino is connected (e.g. "/dev/ttyUSB0"). :param int nodeid: CANopen node id of EMCA, can be viewed and changed in Festo Configuration Tool. Default is 0x01. """ super().__init__(**kwargs) self._arduino = None self._arduino_config = {'serialbus': serialbus, 'nodeid': nodeid} self._bits = self._init_bits() self._exponent = None self._limits = None self._limits_mm = None self._last = { "distance": 0, "valve_open": False, "relay_hvps_on": False, "relay_motor_on": False, } @staticmethod def _init_bits(): """Init name mapping of main Control and Status bytes.""" bit_names = { "ccon": ["ENABLE", "STOP", "BRAKE", "RESET", "-", "LOCK", "OPM1", "OPM2"], "cpos": ["HALT", "START", "HOM", "JOGP", "JOGN", "TEACH", "CLEAR", "-"], "cdir": ["ABS", "COM1", "COM2", "-", "-", "XLIM", "-", "-"], "scon": ["ENABLED", "OPEN", "WARN", "FAULT", "VLOAD", "FCT/MMI", "OPM1", "OPM2"], "spos": ["HALT", "ACK", "MC", "TEACH", "MOV", "FOLERR", "STILL", "REF"], "sdir": ["ABS", "COM1", "COM2", "-", "VLIM", "XLIM", "-", "-"], } return { byte: {bit: 0b1 << i for i, bit in enumerate(bits) if bit != '-'} for byte, bits in bit_names.items() } # ==== Inherited abstract methods ==== def _connect(self): self._arduino = Arduino(**self._arduino_config) self._arduino.toggle_relay_state("RELAY_24_MOTOR") # Power on motor # TODO Add some time delay, before motor powers up? self._arduino.start() try: # Read settings about units (SINC <-> m) and software limits self._exponent = int.from_bytes(self._arduino.sdo_read(0x2258), "little", signed=True) self._limits = ( int.from_bytes(self._arduino.sdo_read(0x21f5, 1), "little", signed=True), int.from_bytes(self._arduino.sdo_read(0x21f5, 2), "little", signed=True), ) self._limits_mm = tuple(-1 * a * 10**(3+self._exponent) for a in self._limits) # Power-on -> Enabled -> Ready self._set_cbyte("ENABLE", True, feedback="ENABLED") self._set_cbyte("STOP", True, feedback="OPEN") # Direct operation self._set_cbyte("OPM2", False, feedback="OPM2") self._set_cbyte("OPM1", True, feedback="OPM1") # EMCA has not responded with desired status before timeout except base.TransmissionError: self._disconnect() return False else: return True def _disconnect(self): try: # Move to 5mm, so that next homing does not take too long self.set_distance(5) except base.DeviceError: pass try: self._set_cbyte("ENABLE", False, feedback="ENABLED") except base.TransmissionError: pass try: self._arduino.stop() except base.TransmissionError: pass self._arduino = None def _is_ready(self): try: self._arduino.sdo_read(0x2431) # Read something (DC voltage) except base.TransmissionError: return False else: return True def _safestate(self): """Note that purpose of this safestate is to 1. stop moving if moving 2. make sure drive and operation are enabled (e.g. motor is ready) """ # This line is important in case of KeyboardInterrupt / stop() call self._arduino._serial.reset_input_buffer() # Quick deceleration self._set_cbyte("STOP", False, feedback="OPEN") # Try to acknowledge fault (may raise exception) # Info: Acknowledgement failed -> safstate failed -> disconnect. # Then on connect the device is restarted, possibly finally # clearing the fault. if self._sbyte("FAULT"): self._set_cbyte("RESET", True) self._waitfor_sbyte("FAULT", False, timeout=1) # Try to return to ready state self._set_cbyte("ENABLE", True, feedback="ENABLED") self._set_cbyte("STOP", True, feedback="OPEN") def _readerror(self): # NOTE Consider if errors can be read directly from EMCA using 0x1??? # CANopen objects. EMCA-EC-C-HP.pdf page 252 if self._sbyte("FAULT"): emcy = self._arduino.emcy_readcodes() if emcy: return "EMCY: " + ", ".join(emcy) else: return "Fault detected but no EMCY received" else: return None # def _defaultstate(self): # NOTE Anything that could be here should probably be set via FCT. # self.set_homing("current") # Software limits -> (-3000, 97000) [default] # data = (-3000).to_bytes(4, "little", signed=True) # self._arduino.sdo_write(data, 0x21f5, 1) # data = (97000).to_bytes(4, "little", signed=True) # self._arduino.sdo_write(data, 0x21f5, 2) # Project axis zero -> 0 [default] # data = (0).to_bytes(4, "little", signed=True) # self._arduino.sdo_write(data, 0x21f4) # ==== Private commands ==== @base.base_command def _sbyte(self, which): """Get state of specified bit 'which' in one of the status bytes SCON, SPOS or SDIR. """ state = self._arduino.fhpp_state() for i, byte in enumerate(["scon", "spos", "sdir"]): if which in self._bits[byte]: return (state[i] & self._bits[byte][which]) != 0 else: raise KeyError("Unknown status bit '{}'".format(which)) @base.compound_command # Compound so that it can be interrupted def _waitfor_sbyte(self, which, b, timestep=10e-3, timeout=1, on_loop=None): """Wait for specified bit 'which' in one of the status bytes SCON, SPOS or SDIR to change to value 'b'=True/False. """ if timeout is None: while self._sbyte(which) != bool(b): if on_loop is not None: on_loop() time.sleep(timestep) else: for _ in range(int(timeout/timestep)): if self._sbyte(which) == bool(b): break if on_loop is not None: on_loop() time.sleep(timestep) else: raise base.TransmissionError("Timed out while waiting for status") @base.base_command def _set_cbyte(self, which, b, data1=0, data2=0, feedback=None): """Set a specified bit 'which' in one of the control bytes CCON, CPOS, or CDIR to value 'b'=True/False. If 'feedback' bit is set, method will block until the specified feedback bit in one of the status bytes has also value 'b'. """ def change_bit(byte, bitmask): return (byte | bitmask) if bool(b) else (byte & (~bitmask)) # Some status bits (but not all!) represent a feedback on current # state of some control values. Read them to acquire current state. state = self._arduino.fhpp_state() control = [ state[0] & 0b11100011, state[1] & 0b00000001, state[2] & 0b00000111, ] # Modify current state for i, byte in enumerate(["ccon", "cpos", "cdir"]): if which in self._bits[byte]: control[i] = change_bit(control[i], self._bits[byte][which]) break else: raise KeyError("Unknown control bit '{}'".format(which)) self._arduino.fhpp_send(*control, data1, data2) if feedback is not None: self._waitfor_sbyte(feedback, b) @base.base_command def _read_relay_states(self): """Read state of power relays and store it to ``self._last``.""" states = self._arduino.get_relay_states() self._last["valve_open"] = states["RELAY_24_VALVE"] self._last["relay_motor_on"] = states["RELAY_24_MOTOR"] self._last["relay_hvps_on"] = states["RELAY_230"] # ==== Commands ==== # NOTE: This should be now handled by _safestate(). Also it would not work # because of _readerror(). # @base.base_command # def reset_error(self): # if self._sbyte("FAULT"): # self._set_cbyte("RESET", True) # try: # self._waitfor_sbyte("FAULT", False, timeout=1) # except base.TransmissionError: # return # self._set_cbyte("ENABLE", True, feedback="ENABLED") # self._set_cbyte("STOP", True, feedback="OPEN") # NOTE: This is probably not needed anymore, it should be set in FCT. # @base.base_command # def set_homing(self, method, speed=None, maxtorque=None, stoptime=None): # self._set_cbyte("STOP", False, feedback="OPEN") # self._set_cbyte("ENABLE", False, feedback="ENABLED") # # Offset axis zero -> 0 # data = (0).to_bytes(4, "little", signed=True) # self._arduino.sdo_write(data, 0x23f2) # # Homing method -> ? # data = { # "current": b"\xdd", # "stop_positive": b"\xee", # "stop_negative": b"\xef", # }[method] # self._arduino.sdo_write(data, 0x23f3) # # Search speed [SINC/s] -> ? # if maxtorque is not None: # data = maxtorque.to_bytes(4, "little", signed=True) # self._arduino.sdo_write(data, 0x23f4, 1) # # Max torque for stop method [per mille] -> ? # if maxtorque is not None: # data = maxtorque.to_bytes(2, "little", signed=True) # self._arduino.sdo_write(data, 0x23f7) # # Stop time for stop method [ms] -> ? # if stoptime is not None: # data = stoptime.to_bytes(2, "little", signed=False) # self._arduino.sdo_write(data, 0x23f9) # self._set_cbyte("ENABLE", True, feedback="ENABLED") # self._set_cbyte("STOP", True, feedback="OPEN")
[docs] @base.compound_command def homing(self): """Execute homing procedure (calibration).""" if not self._sbyte("MC"): raise base.CommandError("Another task is being executed") self._set_cbyte("HALT", True, feedback="HALT") self._set_cbyte("HOM", True, feedback="ACK") self._set_cbyte("HOM", False) self._waitfor_sbyte("MC", True, timestep=100e-3, timeout=None)
[docs] def limits(self): """Return limits on distance of electrodes [mm].""" return self._limits_mm
[docs] @base.base_command def distance(self): """Return distance of electrodes [mm].""" self._last["distance"] = -1 * self._arduino.fhpp_state()[4] * 10**(3+self._exponent) return self._last["distance"]
[docs] @base.compound_command def set_distance(self, distance): """Set new distance of electrodes [mm]. :param float distance: value in range cca 0.5mm-90mm. """ if not self._sbyte("REF"): raise base.CommandError("Homing required") if not self._sbyte("MC"): raise base.CommandError("Another task is being executed") # NOTE: There is a -1 to reverse the direction. Consider changing # PNU 1000 setting "Reversal of direction / Polarity" in FCT. # Also there is a -1 in distance() and limits() !!! sinc = -1 * int(round(float(distance) * 10**(-3-self._exponent))) if not (self._limits[0] < sinc < self._limits[1]): raise base.CommandError("Out of bounds {:.2f}-{:.2f}mm".format(*self.limits())) self._set_cbyte("HALT", True, feedback="HALT") self._set_cbyte("START", True, 0xff, sinc, feedback="ACK") self._set_cbyte("START", False) # Wait until the task is finished self._waitfor_sbyte("MC", True, timestep=100e-3, timeout=None, on_loop=self._idle_update_all)
[docs] @base.base_command def is_relay_on(self): """Return state of power relay of the HV power suply device.""" self._read_relay_states() return self._last["relay_hvps_on"]
[docs] @base.base_command def switch_relay_on(self): """Switch on power for the HV power supply device.""" if not self.is_relay_on(): self._arduino.toggle_relay_state("RELAY_230")
[docs] @base.base_command def switch_relay_off(self): """Switch off power for the HV power supply device.""" if self.is_relay_on(): self._arduino.toggle_relay_state("RELAY_230")
[docs] @base.base_command def is_valve_open(self): """Return the state of main chamber valve.""" self._read_relay_states() return self._last["valve_open"]
[docs] @base.base_command def open_valve(self): """Open main chamber valve.""" if not self.is_valve_open(): self._arduino.toggle_relay_state("RELAY_24_VALVE")
[docs] @base.base_command def close_valve(self): """Close main chamber valve.""" if self.is_valve_open(): self._arduino.toggle_relay_state("RELAY_24_VALVE")
[docs] @base.compound_command def pingpong(self): for i in range(10, 50, 5): self.set_distance(5) self.set_distance(i)
# ==== Methods/commands related to DeviceClient ==== @base.idle_command def _idle_update_all(self): # These methods update self._last self.distance() self._read_relay_states()
[docs] def update_frontend(self, device_client): self._idle_update_all() # Update states if device not busy value = self._last["distance"] # mm device_client.emit("value", { "value": value, "formatted": "{:.2f} mm".format(value), "label": "Distance", "min": min(*self.limits()), "max": max(*self.limits()), "id": "electrode_distance" }) state = self._last["valve_open"] device_client.emit("value", { "value": 1 if state else 0, "formatted": "open" if state else "closed", "label": "Valve", "min": 0, "max": 1, "id": "state_relay_valve" }) state = self._last["relay_hvps_on"] device_client.emit("value", { "value": 1 if state else 0, "formatted": "on" if state else "off", "label": "HV pwr supply", "min": 0, "max": 1, "id": "state_relay_hvps" })
[docs] class Arduino: """Internal helper class for communication with Arduino. Note: See 'hardware/arduino_motor_emca/' for Arduino sourcecode. """ def __init__(self, serialbus, nodeid): self.nodeid = nodeid # CAN node id, number, 7 bits = 0x01-0x7f def reset_arduino(ardu): ardu.setDTR(False) time.sleep(0.1) ardu.reset_input_buffer() ardu.setDTR(True) # This line raises serial.SerialException if ``serialbus`` is not valid self._serial = serial.Serial(serialbus, 9600, timeout=2) reset_arduino(self._serial) time.sleep(5) def _parseline(self): """Read and parse line from serial according to common format of Arduino responses. """ recv = self._serial.readline() if len(recv) == 0 or recv[-1] != ord("\n"): raise base.TransmissionError("Arduino not responding") recv = recv.strip().decode("ascii") return recv[0], recv[1:] def _getpackets(self, ptype): """Get all packets of specified type buffered in Arduino. Note that for different types buffers have different size and behaviour. :param str ptype: Packet types: TPDO1, EMCY, SDO. Additionally, other packets can be retrieved with ``ptype=OTHER``. Returns a list of tuples (packet type [str], packet data [bytes]). """ # FUNCTION NAME (= packet type): See EMCA-EC-C-HP.pdf page 234 ptype_map = { 0b1110: "NMT", # Network management 0b0001: "EMCY", # Emergency object 0b0011: "TPDO1", # FHPP 0b0101: "TPDO2", # FCT 0b0111: "TPDO3", # FHPP+ 0b1001: "TPDO4", # FHPP+ 0b1011: "SDO", # Service data object } def parsepacket(): recv = self._serial.readline().decode("ascii") print("r: " + recv.strip()) # Parsing format "iii:dddd...\r\n" where iii and dd are hex numbers packetid = int(recv[:3], 16) data = bytes.fromhex(recv[4:]) # - FUNCTION CODE: First 4bits of 11bit packetid ptype = ptype_map[packetid >> 7] # - NODE ID: See EMCA-EC-C-HP.pdf page 234, last 7bits of packetid # nodeid = packetid & 0b00001111111 return ptype, data self._serial.write("?{}\n".format(ptype).encode("ascii")) code, count = self._parseline() if code != "?": raise base.TransmissionError("Arduino: " + code + count) packets = [] for _ in range(int(count)): packets.append(parsepacket()) return packets def _sendpacket(self, ptype, data): """Use serial connection with Arduino to send a CAN packet. :param str ptype: type of packet e.g. 'NMT', 'RPDO1', ... see EMCA docs EMCA-EC-C-HP.pdf page 234 :param bytes data: content of the packet (0-64 bytes) """ # FUNCTION CODE (or packet type): See EMCA-EC-C-HP.pdf page 234 funccode = { "NMT": 0b0000, "SYNC": 0b0001, "RPDO1": 0b0100, "RPDO2": 0b0110, "RPDO3": 0b1000, "RPDO4": 0b1010, "SDO": 0b1100, }[ptype] packetid = funccode << 7 if ptype not in ["NMT", "SYNC"]: packetid |= self.nodeid msg = "@{:03x}:{}\n".format(packetid, data.hex()) print("t: " + msg.strip()) self._serial.write(msg.encode("ascii")) code, msg = self._parseline() if (code, msg) != ("@", "OK"): raise base.TransmissionError("Arduino: " + code + msg)
[docs] def start(self): """Bring Arduino and EMCA into an active state.""" self._serial.write(b">START\n") time.sleep(3) # EMCA is restarting and it takes time code, msg = self._parseline() if (code, msg) == (">", "OK"): return elif (code, msg) == ("!", "Timed out"): raise base.TransmissionError("EMCA motor not responding") else: raise base.TransmissionError("Arduino: " + code + msg)
[docs] def stop(self): """Stop EMCA and Arduino.""" self._serial.write(b">STOP\n") code, msg = self._parseline() if (code, msg) != (">", "OK"): raise base.TransmissionError("Arduino: " + code + msg)
[docs] def toggle_relay_state(self, which): """Toggle state of the relays connected to Arduino. All relays are initially switched off. :param str which: one of ``RELAY_24_VALVE``, ``RELAY_24_MOTOR``, ``RELAY_230`` """ assert which in {"RELAY_24_VALVE", "RELAY_24_MOTOR", "RELAY_230"} self._serial.write(">TOGGLE_{}\n".format(which).encode("ascii")) code, msg = self._parseline() if (code, msg) != (">", "OK"): raise base.TransmissionError("Arduino: " + code + msg)
[docs] def get_relay_states(self): """Get current state of all three relays in a dict. The keys of the returned dict correspond to parameter which in ``toggle_relay_state()``. """ self._serial.write(b">GET_RELAY_STATES\n") states = {} for _ in range(3): which, value = "".join(self._parseline()).split() states[which] = (value == "1") if not all(k in states for k in {"RELAY_24_VALVE", "RELAY_24_MOTOR", "RELAY_230"}): raise base.TransmissionError("Failed to acquire all relay states") code, msg = self._parseline() if (code, msg) != (">", "OK"): raise base.TransmissionError("Arduino: " + code + msg) return states
[docs] def sdo_read(self, index, subindex=1): """Execute SDO read transaction. :return bytes data: little-endian representation of 1, 2, or 4 bytes """ msg = b"\x40" msg += index.to_bytes(2, "little") + subindex.to_bytes(1, "little") msg += b"\x00\x00\x00\x00" self._sendpacket("SDO", msg) time.sleep(10e-3) # Give EMCA time (10ms) to respond to Arduino _, msg = self._getpackets("SDO")[0] if msg[0] == 0x4f: return msg[4:5] elif msg[0] == 0x4b: return msg[4:6] elif msg[0] == 0x43: return msg[4:] else: # msg[0] == 0x80 signalizes SDO error, msg[4:] is error code raise base.TransmissionError("Bad SDO transaction "+msg[4:].hex())
[docs] def sdo_write(self, data, index, subindex=1): """Execute SDO write transaction. :param bytes data: little-endian representation of 1, 2, or 4 bytes """ dlen = len(data) msg = {1: b"\x2f", 2: b"\x2b", 4: b"\x23"}[dlen] msg += index.to_bytes(2, "little") + subindex.to_bytes(1, "little") msg += data + (4-dlen) * b"\x00" self._sendpacket("SDO", msg) time.sleep(10e-3) # Give EMCA time (10ms) to respond to Arduino _, msg = self._getpackets("SDO")[0] if msg[0] == 0x60: return else: # msg[0] == 0x80 signalizes SDO error, msg[4:] is error code raise base.TransmissionError("Bad SDO transaction "+msg[4:].hex())
[docs] def fhpp_state(self, actual2signed=True): """Get most recent FHPP packet (type TPDO1) received from EMCA, see EMCA-EC-C-HP.pdf page 54. Returns list of [SCON, SPOS, SDIR, actual1, actual2]. Note: actual2 is by default interpretted as signed int, actual1 is always unsigned. """ _, msg = self._getpackets("TPDO1")[0] res = [msg[i] for i in range(4)] return res + [int.from_bytes(msg[4:], "little", signed=actual2signed)]
[docs] def fhpp_send(self, ccon, cpos, cdir, data1=0, data2=0, data2signed=True): """Send basic FHPP packet (type RPDO1), see EMCA-EC-C-HP.pdf page 50. Packet structure: ccon, cpos, cdit, data1 - each size 1 byte, usigned int data2 - size 4 bytes, signed int (by default) """ msg = b"" for a in [ccon, cpos, cdir, data1]: msg += a.to_bytes(1, "little") msg += data2.to_bytes(4, "little", signed=data2signed) self._sendpacket("RPDO1", msg)
[docs] def emcy_readcodes(self): """Parse all buffered EMCY packets and return them as a list of 2-byte errorcodes in string format '0x0123'. EMCA-EC-C-HP.pdf page 255 """ return ["0x{:04x}".format(int.from_bytes(data[0:2], "little")) for _, data in self._getpackets("EMCY")]
[docs] def other_readall(self): """Return a list of all buffered uncathegorized packets. Packets are represented as tuples (packet type [str], packet data [bytes]). """ return self._getpackets("OTHER")