Source code for device_client.device_client

import socketio
import socketio.exceptions

import time
import inspect
import itertools
import base64
import logging
import pathlib

DOT_DIR = pathlib.Path("~/.plasmalab").expanduser()

[docs] class DeviceClient: """ Device client handles the communication between device and a server. """ __namespace = "/device" def __init__(self, name, password, title="N/A", description="N/A", sio=None, logger="main"): """Init new device client using its account credentials (name, password) which are used for authentication on server. Additional metadata such as title and description can be specified. :param str name: device account name :param str name: device account password :param str title: human readable device name used as a title :param str description: semi-long description of the device :param socketio.Client sio: provide a custom sio client for debugging :param logging.Logger logger: a ``logging.Logger`` instance or ``str`` """ self.sio = sio if sio else socketio.Client(ssl_verify=False) # TODO Remove ssl_verify eventually # self.sio = sio if sio else socketio.Client(logger=True) @self.sio.on("connect", namespace=self.__namespace) def __on_connect_handler(): self.logger.info("Connected to server!") if self.__on_connect: self.__on_connect() @self.sio.on("disconnect", namespace=self.__namespace) def __on_disconnect_handler(): self.__registered = False self.logger.info("Disconnected from server!") if self.__on_disconnect: self.__on_disconnect() @self.sio.on("connect_error", namespace=self.__namespace) def __on_connect_handler(error): self.logger.critical("Server: " + str(error)) self.disconnect_and_stop() @self.sio.on("hello", namespace=self.__namespace) def __on_hello_handler(my_id): self.logger.info("Received hello, id=%s", my_id) if self.device_id is None: # First-time-only setup. This will not run on reconnect. self.__setup_commands() self.device_id = my_id elif my_id != self.device_id: # Because the names of command events contain device id, they # would have to be re-registered. Should not really happen. self.logger.critical("My device id has changed, please restart me") self.disconnect_and_stop() @self.sio.on("bye", namespace=self.__namespace) def __on_bye_handler(): self.logger.info("Received bye") self.disconnect_and_stop() @self.sio.on("check_status", namespace=self.__namespace) def __on_check_status_handler(): self.emit("deviceStatus", {**self.__get_status(), "id": "status"}) if isinstance(logger, str): self.logger = logging.getLogger(logger).getChild("dc") elif logger is not None: self.logger = logger.getChild("dc") else: self.logger = logging.getLogger("device_client") self.device_id = None self.name = name self.password = password self.title = title self.description = description self.commands = [] self.__commands_processed = [] self.__registered = False self.__exiting = False self.__id_counter = itertools.count() self.__on_connect = None self.__on_disconnect = None self.__background_tasks = []
[docs] def on_connect(self, func): self.__on_connect = func
[docs] def on_disconnect(self, func): self.__on_disconnect = func
@property def connected(self): return self.sio.connected and self.__namespace in self.sio.namespaces @property def registered(self): if not self.connected: self.__registered = False return self.__registered
[docs] def register_command(self, command, label=None, group=None, inputs=None, pass_self=False): """Register a new command. The order of registration is conserved. :param callable command: function with any number of parameters :param str label: human readable name of the command :param str group: human readable name of group (title of accordion panel) :param list inputs: specify which input elements will be used for each of callable's parameters, default: text input :param pass_self bool: if True, the DeviceClient object will be passed as the first parameter to the callable """ if self.device_id is not None: # Would have to re-perform the __first_time_setup(), too much hassle raise RuntimeError("Commands must be registered before connecting to server") params = list(inspect.signature(command, follow_wrapped=True).parameters) if pass_self: params = params[1:] # Skip the 'device_client' param handler = lambda data: command(self, **data) else: handler = lambda data: command(**data) if inputs and len(inputs) != len(params): raise ValueError("Length of 'inputs' must match command parameters") for i in (inputs or []): i.setdefault("type", "text") # "Local" command data. Server receives processed version created in # ``__setup_commands()``, which has modified id. self.commands.append({ "id": command.__name__, "name": label or command.__name__, "group": group or label or command.__name__, "params": params, "paramsInputs": inputs or (len(params) * [{"type": "text"}]), "handler": handler, # This is used to create event handler. })
[docs] def register_background_task(self, func): """Add a recurring task, time interval = check_readiness_interval. Task is only run when the device is ready. :param callable func: the task, receives this object as its parameter """ self.__background_tasks.append(func)
[docs] def emit(self, entity, data): """Emit data for a new entity or updated data for existing one. :param str entity: events: 'log', 'value', 'graph', ... :param dict data: json-like object with data """ if not self.connected: self.logger.warn("Failed to emit data ('%s'), not connected", entity) return elif not self.registered: self.logger.warn("Failed to emit data ('%s'), not registered", entity) return data.setdefault("id", self.__get_new_id()) data["ts"] = time.time() * 1000 # ms def callback(b, message): if not b: self.logger.error("Server: " + str(message)) self.sio.emit("entity", {entity: data}, namespace=self.__namespace, callback=callback)
[docs] def emit_command_state(self, command, state): """Update the default values of command parameters shown to the user. Also used to report a new state of a type="toggle" command. :param str command: id name of the command :param list state: list of the new default arguments """ self.emit("command", { "id": command, "paramsDefaults": state if isinstance(state, list) else [state], })
[docs] def emit_datafile(self, file, filename=None): """Emit text file with measured data to server. :param str file: absolute path to the text file or text file object as returned by open() :param str filename: (optional) use this to specify a different filename that will be attached to the event """ if not self.connected: self.logger.warn("Failed to emit data ('datafile'), not connected") return elif not self.registered: self.logger.warn("Failed to emit data ('datafile'), not registered") return if isinstance(file, str): file = open(file, "r") if filename is None: filename = file.name data = { "id": filename.partition(".")[0], "ts": time.time() * 1000, # ms "filename": filename, "contents": file.read(), } def callback(b, message): if not b: self.logger.error("Server: " + str(message)) self.sio.emit("datafile", data, namespace=self.__namespace, callback=callback)
[docs] def emit_log(self, record): """ Convenience method that accepts ``logging.LogRecord`` objects and emits them as a 'log' entity via ``DeviceClient.emit()`` method. Use this method to implement a custom ``logging.Handler``. Info-event types are assigned according to logging levels with two exceptions: - 'DEBUG' => ignored - 'INFO' && text ends with '!' => 'success' :param record: object of type ``logging.LogRecord`` """ if not self.registered: return log_type = record.levelname.lower() text = record.getMessage() if log_type == "debug": return elif log_type == "critical": log_type = "error" elif log_type == "info" and text[-1] == "!": log_type = "success" self.emit("log", {"text": text, "type": log_type})
[docs] def keep_server_updated( self, check_readiness, check_readiness_interval, server_address, retry_on_error=False, blocking=True, ): """Connect to the server and periodically check if device is ready. Based on the ready check, device is registered or unregistered. When device is unregistered, it is not visible to frontend and all its data history is deleted. This method exits when server disconnects the device. Alternatively, a ``KeyboardInterrupt`` (CTRL-C) can be issued to break the connection. Also exceptions from ``socketio`` may be raised on connection failure. :param callable check_readiness: function with 0 parameters that returns True/False stating if device is ready :param check_readiness_interval: time interval in seconds :param server_address: eg. 'localhost:5000' :param retry_on_error: set True to ignore errors during connection attempts and automatically retry (only use when you know that the connection should work) :param blocking: if False, method is run as non-bloking and exits as soon as the connection to server is established. The client keeps running in a background thread. Use method ``disconnect_and_stop()`` to terminate it. """ try: self.__establish_connection(server_address, retry_on_error) self.sio.start_background_task( self.__handle_registration, check_readiness, check_readiness_interval ) while blocking and not self.__exiting: self.sio.sleep(1) except KeyboardInterrupt: self.logger.info("Received keyboard interrupt") self.disconnect_and_stop()
[docs] def disconnect_and_stop(self): """Signalize the client to stop and exit. This will terminate the main loop in ``keep_server_updated()`` and any other related threads. """ self.logger.info("Exiting...") self.sio.disconnect() self.__exiting = True
def __get_new_id(self): return "{:08d}".format(next(self.__id_counter)) def __get_status(self): restartPending = not (DOT_DIR / f"/.{self.name}.updated").exists() if restartPending: self.logger.debug("Status check: restart pending") return { "ts": time.time() * 1000, "restartPending": restartPending, } def __handle_registration(self, check_readiness, check_readiness_interval): while not self.__exiting: ready = check_readiness() # Do nothing until 'hello' with device_id is received and device is ready if self.device_id and ready and self.connected: if not self.__registered: self.__register() if self.__registered: # __register() could have failed for task in self.__background_tasks: task(self) elif not ready and self.__registered: self.__unregister() self.sio.sleep(check_readiness_interval) def __register(self): """Register device (create server-side entry of commands, ...).""" data = { "id": self.device_id, "ts": time.time() * 1000, # ms "title": self.title, "description": self.description, "deviceStatus": self.__get_status(), "commands": self.__commands_processed, } for cmd in data["commands"]: cmd["ts"] = data["ts"] try: success, message = self.sio.call( "register", data, namespace=self.__namespace, timeout=15) except socketio.exceptions.TimeoutError: self.logger.critical("Event 'register' timed out, no response from server") self.disconnect_and_stop() except socketio.exceptions.BadNamespaceError: self.logger.error("Failed to emit 'register', not connected") else: if success: self.__registered = True self.logger.info("Registered %d commands", len(self.__commands_processed)) else: self.logger.error("Server: " + str(message)) self.disconnect_and_stop() def __unregister(self): """Unregister device (delete all server-side data).""" try: success, message = self.sio.call( "unregister", namespace=self.__namespace, timeout=15) except socketio.exceptions.TimeoutError: self.logger.critical("Event 'unregister' timed out, no response from server") self.disconnect_and_stop() except socketio.exceptions.BadNamespaceError: self.logger.error("Failed to emit 'unregister', not connected") self.__registered = False else: if success: self.__registered = False self.logger.info("Unregistered") else: # Only possible error is that the device is not registered self.logger.error("Server: " + str(message)) self.__registered = False def __establish_connection(self, server_address, retry_on_error): """Try to connect to the server, retry in regular intervals.""" def auth_header(username, password): return { "Authorization": "Basic " + base64.b64encode( "{}:{}".format(username, password).encode("utf-8") ).decode("ascii") } while not self.sio.connected and not self.__exiting: try: self.sio.connect( server_address, namespaces=[self.__namespace], headers=auth_header(self.name, self.password), ) return except socketio.exceptions.ConnectionError as e: self.logger.error("ConnectionError: %s", e) if not retry_on_error: raise time.sleep(4) def __setup_commands(self): """Process registered commands - set up event handlers. """ def process_command(command): handler = command.pop("handler") self.sio.on(command["id"], handler=handler, namespace=self.__namespace) return command self.__commands_processed = list(map(process_command, self.commands))