Source code for server.events.device

from flask import Blueprint, request, current_app
from flask_socketio import emit, ConnectionRefusedError
from werkzeug.utils import secure_filename
import functools
import collections
import jsonschema

from .. import socketio, connections_devices, tempdirs_devices
from ..exceptions import UniqueIdConflict
from ..resources import devices, entities
from ..protections import device_required_sio
from ..models import Device
from ..proxy_helpers import real_remote_addr
from .update_frontend import emit_full_update, emit_update

bp = Blueprint("device_events", __name__)


# SECURITY: As opposed to client (user), the devices
#               1. do not use time-limited tokens (TODO: Should they?)
#               2. since the use the credentials directly, their modification
#                  would result in disconnect which might not be very useful
#           Thus there should be a single authentication on connect and no
#           other subsequent re-authentication on other events since it would
#           not achieve anything useful. The following situations
#               - disconnect, when device account is deleted from database
#               - disconnect, when password changes (TODO: Yes?)
#               - disconnect and/or block on request from admin
#           can be dealt with "manually" as a reaction on the said situation.
#


[docs] def device_registered(f): @functools.wraps(f) def wrapper(*args, **kwargs): device_id = connections_devices.by_sid[request.sid] if device_id in devices: return f(device_id, *args, **kwargs) else: return False, "Event '{}' refused, not registered".format(request.event["message"]) return wrapper
[docs] @socketio.on("connect", namespace="/device") @device_required_sio # Passes queried Device object as the first argument def on_connect(device): """Accept authenticated connection and add device to sid database.""" ip = real_remote_addr() try: connections_devices.add(device.id_public, request.sid, ip) except UniqueIdConflict: raise ConnectionRefusedError("Another active connection exists") else: emit("hello", device.id_public) current_app.logger.info("device %s/%s connected as %s from %s", device.id_public, device.name, request.sid, ip) return True
[docs] @socketio.on("disconnect", namespace="/device") def on_disconnect(): """Delete device from sid database.""" device_id = connections_devices.by_sid[request.sid] if device_id in devices: del devices[device_id] del tempdirs_devices[device_id] emit_full_update() current_app.logger.info("device %s unregistered (before disconnect)", device_id) connections_devices.remove(id_public=device_id) current_app.logger.info("device %s disconnected as %s from %s", device_id, request.sid, real_remote_addr())
[docs] @socketio.on("register", namespace="/device") def on_register(data): """Register new device - show it to clients.""" if not isinstance(data, dict): return False, "Bad arguments for event 'register'" device_id = connections_devices.by_sid[request.sid] if device_id in devices: return False, "Already registered" data["id"] = device_id data["name"] = Device.by_id(device_id).name commands = data.pop("commands", []) device_status = data.pop("deviceStatus", {}) # Check whether there are no conflicts of command ids key, n = collections.Counter(cmd["id"] for cmd in commands).most_common(1)[0] if n > 1: return False, f"Command id conflict '{key}'" # Try to add the device-, deviceStatus-, command-entities try: devices.add(data) entities["deviceStatus"].add({**device_status, "id": f"{device_id}.status"}) for cmd in commands: cmd["id"] = f"{device_id}.{cmd['id']}" entities["command"].add(cmd) except jsonschema.ValidationError as e: # Removing device removes any other child entities (deviceStatus, commands) if device_id in devices: del devices[device_id] return False, str(e) tempdirs_devices.add(device_id) emit_full_update() current_app.logger.info("device %s registered", device_id) return True, "Ok"
[docs] @socketio.on("unregister", namespace="/device") @device_registered def on_unregister(device_id): """Unregister device - delete all resources and hide it from clients.""" del devices[device_id] del tempdirs_devices[device_id] emit_full_update() current_app.logger.info("device %s unregistered", device_id) return True, "Ok"
[docs] @socketio.on("entity", namespace="/device") @device_registered def on_entity(device_id, name_data): # value, graph, log """Handle new/updated entity emitted by a device.""" if not isinstance(name_data, dict): return False, "Bad arguments for event 'entity'" name, data = name_data.popitem() if not (isinstance(name, str) and isinstance(data, dict) and "id" in data): return False, "Bad arguments for event 'entity'" if name not in entities and name != "device": return False, f"Unknown entity name '{name}'" if name in ["device"]: # , "command" # TODO Is there a reason to forbid modifying commands? return False, f"Entity '{name}' cannot be modified: only during registration" if name in ["datafile"]: return False, f"Entity '{name}' cannot be modified: use a different event" data["id"] = f"{device_id}.{data['id']}" # TODO remove id prepending in device_client.py container = entities[name] try: # A) Handle new entity. Result: emit entity & device with updated refs. if data["id"] not in container: container.add(data) emit_update(device=devices[device_id], **{name: data}) # A) Entity exists, handle update. Result: emit only the updated entity. else: container.update(data) emit_update(**{name: container[data["id"]]}) except jsonschema.ValidationError as e: return False, str(e) return True, "Ok"
[docs] @socketio.on("command_state", namespace="/device") @device_registered def on_command_state(device_id, data): """Forward new default state of parameters of a command to client.""" missing_keys = {"id", "state"}.difference(data.keys()) if missing_keys: return False, "Missing keys " + str(missing_keys) container = entities["command"] command_id = f"{device_id}.{data['id']}" if command_id not in container: return False, "Invalid command id" container.update({"id": command_id, "paramsDefaults": data["state"]}) emit_update(command=container[command_id]) return True, "Ok"
[docs] @socketio.on("datafile", namespace="/device") @device_registered def on_file(device_id, data): """Save data temporarily on server and forward file metadata to client. No ResourceManager is used - each datafile is represented purely by its id saved in 'devices', where id == filename. """ missing_keys = {"id", "ts", "filename", "contents"}.difference(data.keys()) if missing_keys: return False, "Missing keys " + str(missing_keys) contents_size = len(data["contents"]) if contents_size > current_app.config["MAX_DEVICE_DATAFILE_SIZE"]: return False, "Max file size exceeded" elif not data["filename"] or data["filename"] != secure_filename(data["filename"]): current_app.logger.error("rejected file \"%s\" (name) from device %s", data["filename"], device_id) return False, "Unsafe filename '{}'".format(data["filename"]) elif data["filename"].rpartition(".")[-1] not in ["txt", "csv", "dat"]: current_app.logger.error("rejected file \"%s\" (ext) from device %s", data["filename"], device_id) return False, "Unsafe file extension '{}'".format(data["filename"]) # Add or update file data["id"] = f"{device_id}.{data['id']}" contents = data.pop("contents") container = entities["datafile"] if data["id"] not in container: data["url"] = "/{}/{}".format(device_id, data["filename"]) container.add(data) new_file = True else: container.update(data) new_file = False tempdirs_devices.update_file(device_id, data["filename"], contents) # Delete old files when file count is over limit deleted = tempdirs_devices.delete_files_over_limit(device_id, limit=current_app.config["MAX_DEVICE_DATAFILE_COUNT"]) if deleted: for f in container: if f["filename"] in deleted: del container[f["id"]] if deleted: emit_full_update() elif new_file: emit_update(device=devices[device_id], datafile=container[data["id"]]) else: emit_update(datafile=container[data["id"]]) current_app.logger.info("device %s emitted file \"%s\" (~ %.1f kB)", device_id, data["filename"], contents_size/1024) return True, "Ok"