Source code for server.resources.resources
from ..exceptions import UniqueIdConflict
from types import MappingProxyType
import pathlib
import tempfile
[docs]
class EntityContainer:
"""Container for entity objects (JSON compatible). Each entity has to
contain 'id' field with a unique id of type str.
"""
def __init__(self, validator_new, validator_update):
"""
:param jsonschema.IValidator validator_new:
JSON Schema validator for new entities
:param jsonschema.IValidator validator_update:
more lenient JSON Schema validator for existing entities
"""
self.validator_new = validator_new
self.validator_update = validator_update
self._data = {}
self._on_add = lambda _: None
self._on_del = lambda _: None
def __repr__(self):
return self._data.__repr__()
def __contains__(self, entity_id):
return entity_id in self._data
def __getitem__(self, entity_id):
return self._data[entity_id]
def __delitem__(self, entity_id):
del self._data[entity_id]
self._on_del(entity_id)
[docs]
def add(self, entity):
"""Add new entity, which does not yet exist in the container."""
assert entity["id"] not in self._data
self.validator_new.validate(entity)
self._on_add(entity["id"])
self._data[entity["id"]] = entity.copy()
[docs]
def update(self, entity):
"""Update some or all properties of an existing entity."""
assert entity["id"] in self._data
self.validator_update.validate(entity)
self._data[entity["id"]].update(entity)
[docs]
def to_dict(self):
return self._data
[docs]
class ParentEntityContainer(EntityContainer):
"""Entity container with lists of id references to its children. When an
entity is added or removed from the child-container, the list of references
in the appropriate parent-entity is updated.
"""
def __init__(self, validator_new, validator_update, children={}):
"""
:param jsonschema.IValidator validator_new:
validator for new entities
:param jsonschema.IValidator validator_update:
more lenient validator for existing entities
:param Dict[EntityContainer] children:
mapping of field names of the parent entity to the child
containers, e.g. {'graphs': EntityContainer(...)}, these fields
in the parent entity will contain the ids of the children entities
"""
super().__init__(validator_new, validator_update)
def link_child(child_container, field):
def add(id_):
self._data[id_.rpartition(".")[0]][field].append(id_)
child_container._on_add = add
def delete(id_):
self._data[id_.rpartition(".")[0]][field].remove(id_)
child_container._on_del = delete
for field, child_container in children.items():
link_child(child_container, field)
self._children = children
[docs]
def add(self, entity):
for name in self._children:
entity.setdefault(name, [])
super().add(entity)
def __delitem__(self, entity_id):
for name, container in self._children.items():
for child_id in self[entity_id][name]:
# Accessing _data directly to prevent recursion
del container._data[child_id]
super().__delitem__(entity_id)
[docs]
class TempdirManager:
def __init__(self):
self.__data = {}
def __contains__(self, dir_id):
"""Check whether a temporary directory exists for given id."""
return dir_id in self.__data
def __getitem__(self, dir_id):
"""Return absolute path to the temporary directory (str)."""
return str(pathlib.Path(self.__data[dir_id].name).resolve())
def __delitem__(self, dir_id):
"""Delete given temporary directory and all its contents."""
del self.__data[dir_id]
[docs]
def add(self, dir_id):
"""Create a new temporary directory for given id (e.g. device_id). Id
must be unique otherwise UniqueIdConflict is raised.
"""
if dir_id not in self.__data:
self.__data[dir_id] = tempfile.TemporaryDirectory(
prefix="plasmalab_device_{}_".format(dir_id))
else:
raise UniqueIdConflict("Id '{}' already exists".format(dir_id))
[docs]
def update_file(self, dir_id, filename, contents):
"""Create new or overwrite given file in the temporary directory.
"""
(pathlib.Path(self[dir_id]) / filename).write_text(contents)
[docs]
def delete_files_over_limit(self, dir_id, limit):
"""Check number of files in temp directory. If limit was exceeded,
delete old files (based on file-modified timestamp).
Returns list of deleted filenames.
"""
tempdir = pathlib.Path(self.__data[dir_id].name).resolve()
files = sorted(tempdir.iterdir(), key=lambda f: f.stat().st_mtime, reverse=True)
for to_delete in files[limit:]:
to_delete.unlink()
return list(map(lambda f: f.name, files[limit:]))
[docs]
class SocketioConnections:
"""Track active socket.io connections. Each account (user or device) is
allowed only 1 active connection. This structure can be used to
a) query id_public (account) by sid (connection)
b) and vice versa - query sid by id_public
Modification of added entries are not allowed (only add, get, remove) and
unique constraint is enforced on both sid and id_public to prevent errors.
"""
def __init__(self, multiple_connections=False):
self.__multiple_conns = multiple_connections
self.__account_to_sid = {}
self.__sid_to_account = {}
self.__account_to_host = {}
# MappingProxyType represents a read-only view of dict
self.__view_account_to_sid = MappingProxyType(self.__account_to_sid)
self.__view_sid_to_account = MappingProxyType(self.__sid_to_account)
@property
def by_pubid(self):
"""Public id of account -> sid of connection."""
return self.__view_account_to_sid
@property
def by_sid(self):
"""Sid of connection -> public id of account."""
return self.__view_sid_to_account
[docs]
def get_hosts(self, id_public):
"""Public id of account -> remote host addresses."""
return self.__account_to_host[id_public]
[docs]
def add(self, id_public, sid, host=None):
if sid in self.by_sid:
raise UniqueIdConflict("Entry for sid='{}' exists".format(sid))
if id_public not in self.by_pubid:
self.__account_to_sid[id_public] = []
self.__account_to_host[id_public] = []
elif not self.__multiple_conns:
raise UniqueIdConflict("Entry for id_public='{}' exists".format(id_public))
self.__account_to_sid[id_public].append(sid)
self.__sid_to_account[sid] = id_public
self.__account_to_host[id_public].append(host)
[docs]
def remove(self, *, id_public=None, sid=None):
if id_public is not None:
sids = self.__account_to_sid.pop(id_public)
del self.__account_to_host[id_public]
for sid in sids:
del self.__sid_to_account[sid]
elif sid is not None:
id_public = self.__sid_to_account.pop(sid)
index = self.__account_to_sid[id_public].index(sid)
self.__account_to_sid[id_public].pop(index)
self.__account_to_host[id_public].pop(index)
if not self.__account_to_sid[id_public]:
del self.__account_to_sid[id_public]
del self.__account_to_host[id_public]
else:
raise ValueError("Either id_public or sid must be specified")