Source code for device.base.base

import time
import logging  # Native python logging
from logging.handlers import RotatingFileHandler
import functools  # To ensure a decorator preserves __doc__, __name__, ...
import pathlib

from . import _base_internals as internals

# TODO As of now method _is_ready() is a "connection check". Discuss whether
#      it should be a "device ready check" as well? (Is there any device, that
#      is communicating but refusing to do things because "it is not ready"?)

# TODO Logger writes to stderr. Should it write info and debug to stdout?

# TODO Currently there is a fixed timeout for commands trying to acquire lock.
#      That ensures that idle commands do not block other commands. Better way?

# TODO ??? Implement command queue, make it finite (max size) - probably not useful

# TODO ??? Consider an 'Exception list' that contains specific exceptions caused by
#      e.g. connection loss. This list would be implemented in subclasses - very low priority

# TODO ? Figure out how to ensusre 1 connection to a device os-wide (or some
#        kind of os-wide locks?)

LOG_DIR = pathlib.Path("~/.plasmalab/logs").expanduser()


[docs] class DeviceError(Exception): """Universal exception: all other exceptions intherit from it. Note: All ``DeviceError`` exceptions are internal in the sense, that they are all handled in ``_outside_call()`` wrapper and user does not come into contact with them directly. User gets to see only the messages in log. """ pass
[docs] class CommandError(DeviceError): """Command execution resulted in error. - Raise this in your command method to signalize an error. - Raised automatically when a device signalizes error (via ``_readerror()``). """ pass
[docs] class CommandUserStop(DeviceError): """Command has terminated because of ``stop()`` call. - Raised automatically when ``stop()`` is called. """ pass
[docs] class TransmissionError(DeviceError): """Communication related errors. - Raise this in a communication check (if you implement any). It is useful to implement comm. check in a universal command-sender method (like ``raw_scpi()`` in VisaHardwareBase). If you do not have this option then do not bother. It mainly just helps to make helpful log messages. """ pass
def _outside_call(func, ignore_busy_err, busy_return=None, flags=set()): """Wrapper factory for exception handling and thread-safety. Context: Commands can call other commands - this creates a stack of calls. The first command (the one called from outside of the class - outside call) handles the thread-safety & exceptions, while the rest can ignore it. """ @functools.wraps(func) def wrapper_oc(self, *args, **kwargs): # Note: self is expected to be an instance of class HardwareBase MAIN_COMMANDS = ["connect", "disconnect", "is_ready"] if not self.is_connected() and func.__name__ not in MAIN_COMMANDS: self.log.error("Cannot execute '{}', device not connected".format(func.__name__)) return None try: # Acquire lock for this thread (fails if another command is running) # Note: acquire waits for up to 1s for another command to finish with self._device_lock: self._flags.clear() if self._status["debug_spam"]: self.log.debug("Executing command '{}'".format(func.__name__)) # Execute the command and return - or catch exceptions try: if func.__name__ not in MAIN_COMMANDS: self._clear_device_errors() with self._flags.context(*flags): return func(self, *args, **kwargs) except (CommandUserStop, KeyboardInterrupt): self.log.warning("'{}' terminated by user".format(func.__name__)) except CommandError as e: self.log.error("'{}' terminated: {}".format(func.__name__, e)) self.log.debug("Traceback for CommandError", exc_info=e) except TransmissionError as e: self.log.error("'{}' terminated by transmission error" " (try again or reconnect)".format(func.__name__)) self.log.debug("TransmissionError: " + str(e), exc_info=e) except NotImplementedError: raise except Exception as e: self.log.error("'{}' caused an unexpected error" " (connection lost?/bug?)".format(func.__name__)) self.log.debug("Unexpected exception!", exc_info=e) # Command did not finish => return device to a safe state # But only when connected (something could fail just after disconnect) if self.is_connected(): self.to_safestate() return None except internals.LockNotAcquired: if not ignore_busy_err: self.log.error("Cannot execute '{}', device is busy".format(func.__name__)) if self._status["debug_spam"]: self.log.debug("Command '{}' locked out".format(func.__name__)) return busy_return return wrapper_oc
[docs] def base_command(func): """Method decorator for BASE command. - User can terminate: no - Error if device busy: yes Usage: @base_command """ @functools.wraps(func) def wrapper_bc(self, *args, **kwargs): # Inside call: Called by another command that already acquired lock if self._device_lock.is_current_thread_owner(): check_errors = "IGNORE_ERRORCHECK" not in self._flags check_userstop = "IGNORE_USERSTOP" not in self._flags with self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"): if check_userstop and ("USERSTOP" in self._flags): raise CommandUserStop("Command terminated by user") value = func(self, *args, **kwargs) if check_userstop and ("USERSTOP" in self._flags): raise CommandUserStop("Command terminated by user") # Calling is_connected() because func could have been disconnect() if check_errors and self.is_connected(): err = self._readerror() if err is not None: raise CommandError(err) return value # Outside call: Called from outside of the class else: # Wrap func in base_command for a second time so that the wrapper # is invoked again - this time as an inside call. Otherwise error # checking would be skipped (it is done only during inside call). return _outside_call(base_command(func), ignore_busy_err=False, flags={"IGNORE_USERSTOP"})(self, *args, **kwargs) return wrapper_bc
[docs] def compound_command(func): """Method decorator for COMPOUND command. - User can terminate: yes (before & after any base command is called) - Error if device busy: yes Usage: @compound_command """ @functools.wraps(func) def wrapper_cc(self, *args, **kwargs): if self._device_lock.is_current_thread_owner(): return func(self, *args, **kwargs) else: return _outside_call(func, ignore_busy_err=False)(self, *args, **kwargs) return wrapper_cc
[docs] def idle_command(func=None, *, busy_return=None): """Method decorator for IDLE command. - User can terminate: no - Error if device busy: no Usage: @idle_command or @idle_command(busy_return=<your return value>) """ if func is None: # Syntax trick: allow usage of @idle_command without '()' return functools.partial(idle_command, busy_return=busy_return) @functools.wraps(func) def wrapper_ic(self, *args, **kwargs): if self._device_lock.is_current_thread_owner(): with self._flags.context("IGNORE_USERSTOP"): return func(self, *args, **kwargs) else: return _outside_call(func, ignore_busy_err=True, busy_return=busy_return, flags={"IGNORE_USERSTOP"})(self, *args, **kwargs) return wrapper_ic
[docs] class HardwareBase: """Base class for communication with devices. Thread-safe methods. Any method that communicates with the device either directly or by calling other methods is refered to as *command* and MUST be decorated with one of the following: ``@base_command``, ``@compound_command``, ``@idle_command``. Note: this does not apply to private abstract methods (e.g. ``_connect()``, ...). ``@base_command`` - Base commands usually communicate with the device directly and implement elementary tasks (e.g. set output voltage). They cannot be terminated by user. ``@compound_command`` - Compound commands implement more complex tasks using a set of base commands and other compound commands. They can be terminated by calling ``stop()``. Tip: Any method decorated as a @compound_command can be instead decorated as a @base_command to forbid termination by user. ``@idle_command`` - Idle commands implement low-priority tasks that are called periodically in background (e.g. read and plot current data from oscilloscope). They do not trigger error when they are blocked by the exectuion of another command. They ignore ``stop()`` calls. If you reimplement __init__, make sure to call super().__init__ in it. It is REQUIRED to implement following methods in a subclass, see their docstrings for more info. To implement these methods both commands and direct communication with the device can be used. However, do not decorate any of these with command decorators. Also none of these methods will be interupted by a ``stop()`` call:: def _connect(self): # Must return True/False def _disconnect(self): def _is_ready(self): # Must return True/False It is OPTIONAL to implement these. Notes from above apply here as well:: def _safestate(self): def _readerror(self): # Must return str or None def _defaultstate(self): def _clear_defaultstate(self): """ def __init__(self, debug_spam=False, log_callback=None, logger="main", ignore_log_conflict=False): """Constructor. :param bool debug_spam: Set True to enable more debug messages (e.g. log all called commands). Debug messages logged to file. :param func log_callback: Function that receives ``logging.LogRecord`` every time a log record is created. :param logging.Logger logger: A ``logging.Logger`` instance or ``str``. Change the name when using the same device class for multiple devices on one machine to prevent logging conflict. """ # Current state self._status = { "connected": False, # Connected to device "connection_failures": 0, # Consecutive failed connection attempts "debug_spam": debug_spam } # Flags tied to the execution of current command (cleared afterwards) self._flags = internals.ContextSet({}, only_these={ "USERSTOP", "IGNORE_USERSTOP", "IGNORE_ERRORCHECK" }) # Thread lock - used when sending commands self._device_lock = internals.DeviceLock() # Logging and stuff if isinstance(logger, str): logger = logging.getLogger(logger) elif logger is None: logger = logging.getLogger("main") self._init_logger(logger, False, callback=log_callback, ignore_conflict=ignore_log_conflict) def __del__(self): if hasattr(self, "log"): while self.log.hasHandlers() and self.log.handlers: self.log.removeHandler(self.log.handlers[0]) def _init_logger(self, logger, debug_to_console, callback=None, ignore_conflict=False): """Setup file and stderr logger using native python lib logging.""" self.log = logger.getChild("hw") if logger.handlers and not ignore_conflict: raise RuntimeError("Logging conflict (specify unique logger in constructor)") logger.setLevel(logging.DEBUG) default_format = "[%(asctime)s] %(levelname)-8s %(name)-8s: %(message)s" def get_file_handler(): """1. File handler Max 3 log files, each max 500 lines, oldest entries are overwritten """ def sanitize(s): return "".join(c if c.isalnum() else "_" for c in s) name = type(self).__name__ + "#" + sanitize(logger.name) file = LOG_DIR / "hw_{}.log".format(name) file.parent.mkdir(exist_ok=True, parents=True) handler = RotatingFileHandler(file, "a", maxBytes=80*500, backupCount=2) handler.setFormatter(logging.Formatter(default_format)) handler.setLevel(logging.DEBUG) return handler def get_console_handler(): """2. Console (stdout) handler""" handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( type(self).__name__ + " " + default_format, datefmt="%X")) handler.setLevel(logging.DEBUG if debug_to_console else logging.INFO) return handler def get_custom_handler(callback): """3. Custom handler (typically DeviceClient, emits log to user)""" class MyHandler(logging.Handler): def emit(self, record): callback(record) handler = MyHandler(logging.DEBUG) handler.setFormatter(logging.Formatter("%(name)s - %(message)s")) return handler if not logger.handlers: # Prevent duplicit logging logger.addHandler(get_file_handler()) logger.addHandler(get_console_handler()) else: self.log.warn("Logging conflict (multiple devices log to one file)") self.log.info(16 * "=" + " New instance started " + 16 * "=") if callback is not None: self.log.addHandler(get_custom_handler(callback=callback)) def _clear_device_errors(self): """Clear any errors from device error queue using ``_readerror()``. A warning is logged if queue was not empty. """ err_count = 0 with self._flags.context("IGNORE_ERRORCHECK", "IGNORE_USERSTOP"): err = self._readerror() while err is not None: err_count += 1 self.log.debug("Unhandled error: {}".format(err)) err = self._readerror() if err_count > 0: self.log.warning("Ignoring unhandled {} device errors (see log file)".format(err_count)) # ==== Virtual methods (implement in a subclass) ==== def _connect(self): """Implement your connection procedure here. It is called only when there is no active connection. Return True/False - success or not. """ raise NotImplementedError("_connect() must be implemeted") def _disconnect(self): """Implement your disconnect procedure here. It is called when user intends to close a connection but also when a connection is lost. Thus you should not rely on the connection to be valid. Instead handle any exceptions that may be raised in that situation. """ raise NotImplementedError("_disconnect() must be implemeted") def _is_ready(self): """Implement your connection check procedure here. E.g. send a simple command to the device and check whether it reponds. Return True/False - success or not. Note: Returning False triggers automatic disconnect. """ raise NotImplementedError("_is_ready() must be implemeted") def _safestate(self): """(Optional method) Implement your safety kill-switch that turns off anything dangerous (i.e. voltage out) after a command failed to finish. Use either existing commands of any type or communicate with the device directly. """ raise NotImplementedError # This does not propagate to user def _readerror(self): """(Optional method) Implement your procedure to read last error from device error queue. Return value must be either string (error info) or None (no new error). Note: Errors are checked after each base command. If an error is received, current command is terminated and the error is logged. """ return None def _defaultstate(self): """(Optional method) Set device to a well defined state. Called when a connection to device is established. """ raise NotImplementedError # This does not propagate to user def _clear_defaultstate(self): """(Optional method) Politely revert remote-control related settings (e.g. keys lock). Called before disconnecting from device. """ raise NotImplementedError # This does not propagate to user # ==== Public methods ====
[docs] def is_connected(self): """Check if connected to a hardware device. Note: Connection validity is not checked - to do that call ``is_ready()`` instead. """ return self._status["connected"]
[docs] @base_command def connect(self): """Connect to the device. Note: When already connected, connection validity is not checked - to do that call ``is_ready()`` instead. """ if not self.is_connected(): if self._connect(): self._status["connection_failures"] = 0 self.log.setLevel(logging.DEBUG) self.log.info("Connected to device!") self._status["connected"] = True else: self.log.error("Cannot connect to device") if self._status["connection_failures"] + 1 < 3: self._status["connection_failures"] += 1 else: self.log.debug("Logging suspended after 3 attempts") self.log.setLevel(logging.CRITICAL) return False # Clear any errors left over in the device try: self._clear_device_errors() except Exception as e: self.log.warning("Failed to clear device errors") self.log.debug("Unexpected exception!", exc_info=e) # Init default state try: self.to_defaultstate() except Exception as e: self.log.warning("Failed to initialize device to default state") self.log.debug("Unexpected exception!", exc_info=e) return True else: self.log.info("Already connected to device") return True
[docs] @base_command def disconnect(self): """Disconnect from the device.""" if self.is_connected(): try: self.clear_defaultstate() except Exception as e: self.log.warning("Failed to clean up default state settings") self.log.debug("Unexpected exception!", exc_info=e) self._disconnect() self._status["connected"] = False self.log.info("Disconnected from device")
[docs] @idle_command(busy_return=True) def is_ready(self): """Check if connection to device was opened and if it is responding. If not, try to open a new connection. """ if self.is_connected(): # Check the connection if self._is_ready(): return True # No response, connection invalid, try to reconnect else: self.log.error("Connection lost") self.disconnect() return False else: return self.connect()
[docs] def stop(self): """Signilize for current command (if any) to stop its execution as soon as possible. After command stops, device is brought to a safe state. Note: Idle commands are not affected. """ if self._device_lock.locked(): self._flags.add("USERSTOP") if self._status["debug_spam"]: self.log.debug("User requested command termination")
[docs] def to_safestate(self): """Return device to safe state (software kill-switch). This essentially means "turn off anything that can be turned off". """ # DevNote: This method is not a command of any type (base/comp./idle) # to avoid infinite recursion (when any command fails, this is called). if not self.is_connected(): self.log.error("Cannot execute 'to_safestate', device not connected") return if self._status["debug_spam"]: self.log.debug("Executing 'to_safestate'") # self.log.info("Returning device to safe state") try: if self._device_lock.is_current_thread_owner(): with self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"): self._safestate() else: with self._device_lock, self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"): self._safestate() self.log.info("Device returned to safe state") except internals.LockNotAcquired: self.log.error("Cannot execute 'to_safestate', device is busy") except NotImplementedError: pass except KeyboardInterrupt: self.log.critical("Return to safe state aborted! Please safe-check device MANUALLY!") raise except Exception as e: self.log.critical("Failed to return to safe state! Please safe-check device MANUALLY!") self.log.debug("Exception "+str(type(e))+" "+str(e), exc_info=e) self.disconnect()
[docs] @base_command def to_defaultstate(self): """Initialize device to default state. """ try: self._defaultstate() except NotImplementedError: pass else: self.log.info("Device initialized to default state")
[docs] @base_command def clear_defaultstate(self): """Release device from default state (e.g. turn off key-lock). """ try: self._clear_defaultstate() except NotImplementedError: pass else: self.log.info("Default state settings cleaned up")