Source code for server.resources.resources

from ..exceptions import UniqueIdConflict

from types import MappingProxyType
import pathlib
import tempfile


[docs] class EntityContainer: """Container for entity objects (JSON compatible). Each entity has to contain 'id' field with a unique id of type str. """ def __init__(self, validator_new, validator_update): """ :param jsonschema.IValidator validator_new: JSON Schema validator for new entities :param jsonschema.IValidator validator_update: more lenient JSON Schema validator for existing entities """ self.validator_new = validator_new self.validator_update = validator_update self._data = {} self._on_add = lambda _: None self._on_del = lambda _: None def __repr__(self): return self._data.__repr__() def __contains__(self, entity_id): return entity_id in self._data def __getitem__(self, entity_id): return self._data[entity_id] def __delitem__(self, entity_id): del self._data[entity_id] self._on_del(entity_id)
[docs] def add(self, entity): """Add new entity, which does not yet exist in the container.""" assert entity["id"] not in self._data self.validator_new.validate(entity) self._on_add(entity["id"]) self._data[entity["id"]] = entity.copy()
[docs] def update(self, entity): """Update some or all properties of an existing entity.""" assert entity["id"] in self._data self.validator_update.validate(entity) self._data[entity["id"]].update(entity)
[docs] def to_dict(self): return self._data
[docs] class ParentEntityContainer(EntityContainer): """Entity container with lists of id references to its children. When an entity is added or removed from the child-container, the list of references in the appropriate parent-entity is updated. """ def __init__(self, validator_new, validator_update, children={}): """ :param jsonschema.IValidator validator_new: validator for new entities :param jsonschema.IValidator validator_update: more lenient validator for existing entities :param Dict[EntityContainer] children: mapping of field names of the parent entity to the child containers, e.g. {'graphs': EntityContainer(...)}, these fields in the parent entity will contain the ids of the children entities """ super().__init__(validator_new, validator_update) def link_child(child_container, field): def add(id_): self._data[id_.rpartition(".")[0]][field].append(id_) child_container._on_add = add def delete(id_): self._data[id_.rpartition(".")[0]][field].remove(id_) child_container._on_del = delete for field, child_container in children.items(): link_child(child_container, field) self._children = children
[docs] def add(self, entity): for name in self._children: entity.setdefault(name, []) super().add(entity)
def __delitem__(self, entity_id): for name, container in self._children.items(): for child_id in self[entity_id][name]: # Accessing _data directly to prevent recursion del container._data[child_id] super().__delitem__(entity_id)
[docs] class TempdirManager: def __init__(self): self.__data = {} def __contains__(self, dir_id): """Check whether a temporary directory exists for given id.""" return dir_id in self.__data def __getitem__(self, dir_id): """Return absolute path to the temporary directory (str).""" return str(pathlib.Path(self.__data[dir_id].name).resolve()) def __delitem__(self, dir_id): """Delete given temporary directory and all its contents.""" del self.__data[dir_id]
[docs] def add(self, dir_id): """Create a new temporary directory for given id (e.g. device_id). Id must be unique otherwise UniqueIdConflict is raised. """ if dir_id not in self.__data: self.__data[dir_id] = tempfile.TemporaryDirectory( prefix="plasmalab_device_{}_".format(dir_id)) else: raise UniqueIdConflict("Id '{}' already exists".format(dir_id))
[docs] def update_file(self, dir_id, filename, contents): """Create new or overwrite given file in the temporary directory. """ (pathlib.Path(self[dir_id]) / filename).write_text(contents)
[docs] def delete_files_over_limit(self, dir_id, limit): """Check number of files in temp directory. If limit was exceeded, delete old files (based on file-modified timestamp). Returns list of deleted filenames. """ tempdir = pathlib.Path(self.__data[dir_id].name).resolve() files = sorted(tempdir.iterdir(), key=lambda f: f.stat().st_mtime, reverse=True) for to_delete in files[limit:]: to_delete.unlink() return list(map(lambda f: f.name, files[limit:]))
[docs] class SocketioConnections: """Track active socket.io connections. Each account (user or device) is allowed only 1 active connection. This structure can be used to a) query id_public (account) by sid (connection) b) and vice versa - query sid by id_public Modification of added entries are not allowed (only add, get, remove) and unique constraint is enforced on both sid and id_public to prevent errors. """ def __init__(self, multiple_connections=False): self.__multiple_conns = multiple_connections self.__account_to_sid = {} self.__sid_to_account = {} self.__account_to_host = {} # MappingProxyType represents a read-only view of dict self.__view_account_to_sid = MappingProxyType(self.__account_to_sid) self.__view_sid_to_account = MappingProxyType(self.__sid_to_account) @property def by_pubid(self): """Public id of account -> sid of connection.""" return self.__view_account_to_sid @property def by_sid(self): """Sid of connection -> public id of account.""" return self.__view_sid_to_account
[docs] def get_hosts(self, id_public): """Public id of account -> remote host addresses.""" return self.__account_to_host[id_public]
[docs] def add(self, id_public, sid, host=None): if sid in self.by_sid: raise UniqueIdConflict("Entry for sid='{}' exists".format(sid)) if id_public not in self.by_pubid: self.__account_to_sid[id_public] = [] self.__account_to_host[id_public] = [] elif not self.__multiple_conns: raise UniqueIdConflict("Entry for id_public='{}' exists".format(id_public)) self.__account_to_sid[id_public].append(sid) self.__sid_to_account[sid] = id_public self.__account_to_host[id_public].append(host)
[docs] def remove(self, *, id_public=None, sid=None): if id_public is not None: sids = self.__account_to_sid.pop(id_public) del self.__account_to_host[id_public] for sid in sids: del self.__sid_to_account[sid] elif sid is not None: id_public = self.__sid_to_account.pop(sid) index = self.__account_to_sid[id_public].index(sid) self.__account_to_sid[id_public].pop(index) self.__account_to_host[id_public].pop(index) if not self.__account_to_sid[id_public]: del self.__account_to_sid[id_public] del self.__account_to_host[id_public] else: raise ValueError("Either id_public or sid must be specified")