Source code for device.base.arduinobase

import serial
import time
from collections import namedtuple

from . import base


SerialConfig = namedtuple("SerialConfig", ["port", "baudrate", "start_msg"])

[docs] class ArduinoBase(base.HardwareBase): """Base class for Arduino-based devices. Implements ``_connect()``, ``_disconnect()`` and two utility methods: - ``_readline()``, returns stripped and decoded line - ``_write("something\\r\\n")``, encodes and writes a message :param SerialConfig serial_cfg: parameters for the serial bus :param dict kwargs: additional parameters are passed to ``HardwareBase`` """ def __init__(self, serial_cfg, **kwargs): """Constructor""" super().__init__(**kwargs) self._serial_cfg = serial_cfg self._ardu = None # ==== Inherited abstract methods ==== def _connect(self): def reset(arduino): arduino.dtr = False time.sleep(0.1) arduino.reset_input_buffer() arduino.dtr = True self.log.debug("Using serial port '%s'", self._serial_cfg.port) try: arduino = serial.Serial( port=self._serial_cfg.port, baudrate=self._serial_cfg.baudrate, ) except serial.SerialException: self.log.debug("Failed to open serial '%s'", self._serial_cfg.port) return False reset(arduino) # Temporarily set long timeout - waiting for Arduino to reset arduino.timeout = 5 recv = arduino.readline().strip().decode("ascii") if recv != self._serial_cfg.start_msg: self.log.debug("Unexpected start message '%s'", recv) arduino.close() return False arduino.timeout = 2 self._ardu = arduino return True def _disconnect(self): self._ardu.close() self._ardu = None def _is_ready(self): try: self._ardu.in_waiting return True except serial.SerialException as e: self.log.error("Serial connection failed: %s", str(e)) return False # ==== Private commands ==== @base.base_command def _readline(self): return self._ardu.readline().strip().decode("ascii") @base.base_command def _write(self, msg): return self._ardu.write(msg.encode("ascii"))