Source code for device.base.arduinobase
import serial
import time
from collections import namedtuple
from . import base
SerialConfig = namedtuple("SerialConfig", ["port", "baudrate", "start_msg"])
[docs]
class ArduinoBase(base.HardwareBase):
"""Base class for Arduino-based devices. Implements ``_connect()``,
``_disconnect()`` and two utility methods:
- ``_readline()``, returns stripped and decoded line
- ``_write("something\\r\\n")``, encodes and writes a message
:param SerialConfig serial_cfg: parameters for the serial bus
:param dict kwargs: additional parameters are passed to ``HardwareBase``
"""
def __init__(self, serial_cfg, **kwargs):
"""Constructor"""
super().__init__(**kwargs)
self._serial_cfg = serial_cfg
self._ardu = None
# ==== Inherited abstract methods ====
def _connect(self):
def reset(arduino):
arduino.dtr = False
time.sleep(0.1)
arduino.reset_input_buffer()
arduino.dtr = True
self.log.debug("Using serial port '%s'", self._serial_cfg.port)
try:
arduino = serial.Serial(
port=self._serial_cfg.port,
baudrate=self._serial_cfg.baudrate,
)
except serial.SerialException:
self.log.debug("Failed to open serial '%s'", self._serial_cfg.port)
return False
reset(arduino)
# Temporarily set long timeout - waiting for Arduino to reset
arduino.timeout = 5
recv = arduino.readline().strip().decode("ascii")
if recv != self._serial_cfg.start_msg:
self.log.debug("Unexpected start message '%s'", recv)
arduino.close()
return False
arduino.timeout = 2
self._ardu = arduino
return True
def _disconnect(self):
self._ardu.close()
self._ardu = None
def _is_ready(self):
try:
self._ardu.in_waiting
return True
except serial.SerialException as e:
self.log.error("Serial connection failed: %s", str(e))
return False
# ==== Private commands ====
@base.base_command
def _readline(self):
return self._ardu.readline().strip().decode("ascii")
@base.base_command
def _write(self, msg):
return self._ardu.write(msg.encode("ascii"))