import time
import logging # Native python logging
from logging.handlers import RotatingFileHandler
import functools # To ensure a decorator preserves __doc__, __name__, ...
import pathlib
from . import _base_internals as internals
# TODO As of now method _is_ready() is a "connection check". Discuss whether
# it should be a "device ready check" as well? (Is there any device, that
# is communicating but refusing to do things because "it is not ready"?)
# TODO Logger writes to stderr. Should it write info and debug to stdout?
# TODO Currently there is a fixed timeout for commands trying to acquire lock.
# That ensures that idle commands do not block other commands. Better way?
# TODO ??? Implement command queue, make it finite (max size) - probably not useful
# TODO ??? Consider an 'Exception list' that contains specific exceptions caused by
# e.g. connection loss. This list would be implemented in subclasses - very low priority
# TODO ? Figure out how to ensusre 1 connection to a device os-wide (or some
# kind of os-wide locks?)
LOG_DIR = pathlib.Path("~/.plasmalab/logs").expanduser()
[docs]
class DeviceError(Exception):
"""Universal exception: all other exceptions intherit from it.
Note: All ``DeviceError`` exceptions are internal in the sense, that they
are all handled in ``_outside_call()`` wrapper and user does not come into
contact with them directly. User gets to see only the messages in log.
"""
pass
[docs]
class CommandError(DeviceError):
"""Command execution resulted in error.
- Raise this in your command method to signalize an error.
- Raised automatically when a device signalizes error
(via ``_readerror()``).
"""
pass
[docs]
class CommandUserStop(DeviceError):
"""Command has terminated because of ``stop()`` call.
- Raised automatically when ``stop()`` is called.
"""
pass
[docs]
class TransmissionError(DeviceError):
"""Communication related errors.
- Raise this in a communication check (if you implement any).
It is useful to implement comm. check in a universal command-sender method
(like ``raw_scpi()`` in VisaHardwareBase). If you do not have this option
then do not bother. It mainly just helps to make helpful log messages.
"""
pass
def _outside_call(func, ignore_busy_err, busy_return=None, flags=set()):
"""Wrapper factory for exception handling and thread-safety.
Context: Commands can call other commands - this creates a stack of calls.
The first command (the one called from outside of the class - outside call)
handles the thread-safety & exceptions, while the rest can ignore it.
"""
@functools.wraps(func)
def wrapper_oc(self, *args, **kwargs):
# Note: self is expected to be an instance of class HardwareBase
MAIN_COMMANDS = ["connect", "disconnect", "is_ready"]
if not self.is_connected() and func.__name__ not in MAIN_COMMANDS:
self.log.error("Cannot execute '{}', device not connected".format(func.__name__))
return None
try:
# Acquire lock for this thread (fails if another command is running)
# Note: acquire waits for up to 1s for another command to finish
with self._device_lock:
self._flags.clear()
if self._status["debug_spam"]:
self.log.debug("Executing command '{}'".format(func.__name__))
# Execute the command and return - or catch exceptions
try:
if func.__name__ not in MAIN_COMMANDS:
self._clear_device_errors()
with self._flags.context(*flags):
return func(self, *args, **kwargs)
except (CommandUserStop, KeyboardInterrupt):
self.log.warning("'{}' terminated by user".format(func.__name__))
except CommandError as e:
self.log.error("'{}' terminated: {}".format(func.__name__, e))
self.log.debug("Traceback for CommandError", exc_info=e)
except TransmissionError as e:
self.log.error("'{}' terminated by transmission error"
" (try again or reconnect)".format(func.__name__))
self.log.debug("TransmissionError: " + str(e), exc_info=e)
except NotImplementedError:
raise
except Exception as e:
self.log.error("'{}' caused an unexpected error"
" (connection lost?/bug?)".format(func.__name__))
self.log.debug("Unexpected exception!", exc_info=e)
# Command did not finish => return device to a safe state
# But only when connected (something could fail just after disconnect)
if self.is_connected():
self.to_safestate()
return None
except internals.LockNotAcquired:
if not ignore_busy_err:
self.log.error("Cannot execute '{}', device is busy".format(func.__name__))
if self._status["debug_spam"]:
self.log.debug("Command '{}' locked out".format(func.__name__))
return busy_return
return wrapper_oc
[docs]
def base_command(func):
"""Method decorator for BASE command.
- User can terminate: no
- Error if device busy: yes
Usage: @base_command
"""
@functools.wraps(func)
def wrapper_bc(self, *args, **kwargs):
# Inside call: Called by another command that already acquired lock
if self._device_lock.is_current_thread_owner():
check_errors = "IGNORE_ERRORCHECK" not in self._flags
check_userstop = "IGNORE_USERSTOP" not in self._flags
with self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"):
if check_userstop and ("USERSTOP" in self._flags):
raise CommandUserStop("Command terminated by user")
value = func(self, *args, **kwargs)
if check_userstop and ("USERSTOP" in self._flags):
raise CommandUserStop("Command terminated by user")
# Calling is_connected() because func could have been disconnect()
if check_errors and self.is_connected():
err = self._readerror()
if err is not None:
raise CommandError(err)
return value
# Outside call: Called from outside of the class
else:
# Wrap func in base_command for a second time so that the wrapper
# is invoked again - this time as an inside call. Otherwise error
# checking would be skipped (it is done only during inside call).
return _outside_call(base_command(func), ignore_busy_err=False,
flags={"IGNORE_USERSTOP"})(self, *args, **kwargs)
return wrapper_bc
[docs]
def compound_command(func):
"""Method decorator for COMPOUND command.
- User can terminate: yes (before & after any base command is called)
- Error if device busy: yes
Usage: @compound_command
"""
@functools.wraps(func)
def wrapper_cc(self, *args, **kwargs):
if self._device_lock.is_current_thread_owner():
return func(self, *args, **kwargs)
else:
return _outside_call(func, ignore_busy_err=False)(self, *args, **kwargs)
return wrapper_cc
[docs]
def idle_command(func=None, *, busy_return=None):
"""Method decorator for IDLE command.
- User can terminate: no
- Error if device busy: no
Usage: @idle_command or @idle_command(busy_return=<your return value>)
"""
if func is None: # Syntax trick: allow usage of @idle_command without '()'
return functools.partial(idle_command, busy_return=busy_return)
@functools.wraps(func)
def wrapper_ic(self, *args, **kwargs):
if self._device_lock.is_current_thread_owner():
with self._flags.context("IGNORE_USERSTOP"):
return func(self, *args, **kwargs)
else:
return _outside_call(func, ignore_busy_err=True, busy_return=busy_return,
flags={"IGNORE_USERSTOP"})(self, *args, **kwargs)
return wrapper_ic
[docs]
class HardwareBase:
"""Base class for communication with devices. Thread-safe methods.
Any method that communicates with the device either directly or by calling
other methods is refered to as *command* and MUST be decorated with one of
the following: ``@base_command``, ``@compound_command``, ``@idle_command``.
Note: this does not apply to private abstract methods (e.g. ``_connect()``, ...).
``@base_command`` - Base commands usually communicate with the device directly
and implement elementary tasks (e.g. set output voltage). They cannot be
terminated by user.
``@compound_command`` - Compound commands implement more complex tasks using
a set of base commands and other compound commands. They can be terminated
by calling ``stop()``.
Tip: Any method decorated as a @compound_command can be instead decorated
as a @base_command to forbid termination by user.
``@idle_command`` - Idle commands implement low-priority tasks that are
called periodically in background (e.g. read and plot current data from
oscilloscope). They do not trigger error when they are blocked by
the exectuion of another command. They ignore ``stop()`` calls.
If you reimplement __init__, make sure to call super().__init__ in it.
It is REQUIRED to implement following methods in a subclass, see their
docstrings for more info. To implement these methods both commands and
direct communication with the device can be used. However, do not decorate
any of these with command decorators. Also none of these methods will be
interupted by a ``stop()`` call::
def _connect(self): # Must return True/False
def _disconnect(self):
def _is_ready(self): # Must return True/False
It is OPTIONAL to implement these. Notes from above apply here as well::
def _safestate(self):
def _readerror(self): # Must return str or None
def _defaultstate(self):
def _clear_defaultstate(self):
"""
def __init__(self, debug_spam=False, log_callback=None, logger="main",
ignore_log_conflict=False):
"""Constructor.
:param bool debug_spam: Set True to enable more debug messages (e.g.
log all called commands). Debug messages logged to file.
:param func log_callback: Function that receives ``logging.LogRecord``
every time a log record is created.
:param logging.Logger logger: A ``logging.Logger`` instance or ``str``.
Change the name when using the same device class for multiple
devices on one machine to prevent logging conflict.
"""
# Current state
self._status = {
"connected": False, # Connected to device
"connection_failures": 0, # Consecutive failed connection attempts
"debug_spam": debug_spam
}
# Flags tied to the execution of current command (cleared afterwards)
self._flags = internals.ContextSet({}, only_these={
"USERSTOP", "IGNORE_USERSTOP", "IGNORE_ERRORCHECK"
})
# Thread lock - used when sending commands
self._device_lock = internals.DeviceLock()
# Logging and stuff
if isinstance(logger, str):
logger = logging.getLogger(logger)
elif logger is None:
logger = logging.getLogger("main")
self._init_logger(logger, False, callback=log_callback,
ignore_conflict=ignore_log_conflict)
def __del__(self):
if hasattr(self, "log"):
while self.log.hasHandlers() and self.log.handlers:
self.log.removeHandler(self.log.handlers[0])
def _init_logger(self, logger, debug_to_console, callback=None, ignore_conflict=False):
"""Setup file and stderr logger using native python lib logging."""
self.log = logger.getChild("hw")
if logger.handlers and not ignore_conflict:
raise RuntimeError("Logging conflict (specify unique logger in constructor)")
logger.setLevel(logging.DEBUG)
default_format = "[%(asctime)s] %(levelname)-8s %(name)-8s: %(message)s"
def get_file_handler():
"""1. File handler
Max 3 log files, each max 500 lines, oldest entries are overwritten
"""
def sanitize(s):
return "".join(c if c.isalnum() else "_" for c in s)
name = type(self).__name__ + "#" + sanitize(logger.name)
file = LOG_DIR / "hw_{}.log".format(name)
file.parent.mkdir(exist_ok=True, parents=True)
handler = RotatingFileHandler(file, "a", maxBytes=80*500, backupCount=2)
handler.setFormatter(logging.Formatter(default_format))
handler.setLevel(logging.DEBUG)
return handler
def get_console_handler():
"""2. Console (stdout) handler"""
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
type(self).__name__ + " " + default_format, datefmt="%X"))
handler.setLevel(logging.DEBUG if debug_to_console else logging.INFO)
return handler
def get_custom_handler(callback):
"""3. Custom handler (typically DeviceClient, emits log to user)"""
class MyHandler(logging.Handler):
def emit(self, record):
callback(record)
handler = MyHandler(logging.DEBUG)
handler.setFormatter(logging.Formatter("%(name)s - %(message)s"))
return handler
if not logger.handlers: # Prevent duplicit logging
logger.addHandler(get_file_handler())
logger.addHandler(get_console_handler())
else:
self.log.warn("Logging conflict (multiple devices log to one file)")
self.log.info(16 * "=" + " New instance started " + 16 * "=")
if callback is not None:
self.log.addHandler(get_custom_handler(callback=callback))
def _clear_device_errors(self):
"""Clear any errors from device error queue using ``_readerror()``.
A warning is logged if queue was not empty.
"""
err_count = 0
with self._flags.context("IGNORE_ERRORCHECK", "IGNORE_USERSTOP"):
err = self._readerror()
while err is not None:
err_count += 1
self.log.debug("Unhandled error: {}".format(err))
err = self._readerror()
if err_count > 0:
self.log.warning("Ignoring unhandled {} device errors (see log file)".format(err_count))
# ==== Virtual methods (implement in a subclass) ====
def _connect(self):
"""Implement your connection procedure here. It is called only when
there is no active connection.
Return True/False - success or not.
"""
raise NotImplementedError("_connect() must be implemeted")
def _disconnect(self):
"""Implement your disconnect procedure here. It is called when user
intends to close a connection but also when a connection is lost. Thus
you should not rely on the connection to be valid. Instead handle any
exceptions that may be raised in that situation.
"""
raise NotImplementedError("_disconnect() must be implemeted")
def _is_ready(self):
"""Implement your connection check procedure here. E.g. send a simple
command to the device and check whether it reponds.
Return True/False - success or not.
Note: Returning False triggers automatic disconnect.
"""
raise NotImplementedError("_is_ready() must be implemeted")
def _safestate(self):
"""(Optional method) Implement your safety kill-switch that turns off
anything dangerous (i.e. voltage out) after a command failed to finish.
Use either existing commands of any type or communicate with the device
directly.
"""
raise NotImplementedError # This does not propagate to user
def _readerror(self):
"""(Optional method) Implement your procedure to read last error from
device error queue. Return value must be either string (error info)
or None (no new error).
Note: Errors are checked after each base command. If an error is
received, current command is terminated and the error is logged.
"""
return None
def _defaultstate(self):
"""(Optional method) Set device to a well defined state. Called when
a connection to device is established.
"""
raise NotImplementedError # This does not propagate to user
def _clear_defaultstate(self):
"""(Optional method) Politely revert remote-control related
settings (e.g. keys lock). Called before disconnecting from device.
"""
raise NotImplementedError # This does not propagate to user
# ==== Public methods ====
[docs]
def is_connected(self):
"""Check if connected to a hardware device. Note: Connection validity
is not checked - to do that call ``is_ready()`` instead.
"""
return self._status["connected"]
[docs]
@base_command
def connect(self):
"""Connect to the device. Note: When already connected, connection
validity is not checked - to do that call ``is_ready()`` instead.
"""
if not self.is_connected():
if self._connect():
self._status["connection_failures"] = 0
self.log.setLevel(logging.DEBUG)
self.log.info("Connected to device!")
self._status["connected"] = True
else:
self.log.error("Cannot connect to device")
if self._status["connection_failures"] + 1 < 3:
self._status["connection_failures"] += 1
else:
self.log.debug("Logging suspended after 3 attempts")
self.log.setLevel(logging.CRITICAL)
return False
# Clear any errors left over in the device
try:
self._clear_device_errors()
except Exception as e:
self.log.warning("Failed to clear device errors")
self.log.debug("Unexpected exception!", exc_info=e)
# Init default state
try:
self.to_defaultstate()
except Exception as e:
self.log.warning("Failed to initialize device to default state")
self.log.debug("Unexpected exception!", exc_info=e)
return True
else:
self.log.info("Already connected to device")
return True
[docs]
@base_command
def disconnect(self):
"""Disconnect from the device."""
if self.is_connected():
try:
self.clear_defaultstate()
except Exception as e:
self.log.warning("Failed to clean up default state settings")
self.log.debug("Unexpected exception!", exc_info=e)
self._disconnect()
self._status["connected"] = False
self.log.info("Disconnected from device")
[docs]
@idle_command(busy_return=True)
def is_ready(self):
"""Check if connection to device was opened and if it is responding.
If not, try to open a new connection.
"""
if self.is_connected():
# Check the connection
if self._is_ready():
return True
# No response, connection invalid, try to reconnect
else:
self.log.error("Connection lost")
self.disconnect()
return False
else:
return self.connect()
[docs]
def stop(self):
"""Signilize for current command (if any) to stop its execution as soon
as possible. After command stops, device is brought to a safe state.
Note: Idle commands are not affected.
"""
if self._device_lock.locked():
self._flags.add("USERSTOP")
if self._status["debug_spam"]:
self.log.debug("User requested command termination")
[docs]
def to_safestate(self):
"""Return device to safe state (software kill-switch). This essentially
means "turn off anything that can be turned off".
"""
# DevNote: This method is not a command of any type (base/comp./idle)
# to avoid infinite recursion (when any command fails, this is called).
if not self.is_connected():
self.log.error("Cannot execute 'to_safestate', device not connected")
return
if self._status["debug_spam"]:
self.log.debug("Executing 'to_safestate'")
# self.log.info("Returning device to safe state")
try:
if self._device_lock.is_current_thread_owner():
with self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"):
self._safestate()
else:
with self._device_lock, self._flags.context("IGNORE_USERSTOP", "IGNORE_ERRORCHECK"):
self._safestate()
self.log.info("Device returned to safe state")
except internals.LockNotAcquired:
self.log.error("Cannot execute 'to_safestate', device is busy")
except NotImplementedError:
pass
except KeyboardInterrupt:
self.log.critical("Return to safe state aborted! Please safe-check device MANUALLY!")
raise
except Exception as e:
self.log.critical("Failed to return to safe state! Please safe-check device MANUALLY!")
self.log.debug("Exception "+str(type(e))+" "+str(e), exc_info=e)
self.disconnect()
[docs]
@base_command
def to_defaultstate(self):
"""Initialize device to default state.
"""
try:
self._defaultstate()
except NotImplementedError:
pass
else:
self.log.info("Device initialized to default state")
[docs]
@base_command
def clear_defaultstate(self):
"""Release device from default state (e.g. turn off key-lock).
"""
try:
self._clear_defaultstate()
except NotImplementedError:
pass
else:
self.log.info("Default state settings cleaned up")