Source code for server.models

import time
import uuid

from werkzeug.security import generate_password_hash, check_password_hash

from . import db

roles = {"admin", "user"}  # valid roles


[docs] class User(db.Model): # Assigned automatically id = db.Column(db.Integer, primary_key=True) id_public = db.Column(db.String(36), unique=True, nullable=False) # Modifiable username = db.Column(db.String(30), unique=True, nullable=False) role = db.Column(db.String(20), nullable=False) email = db.Column(db.String(50)) # Modifiable via set_password() password = db.Column(db.String(200), nullable=False) def __init__(self, **kwargs): super().__init__(**kwargs) self.id_public = str(uuid.uuid4())
[docs] def set_password(self, password): """Create hashed password.""" self.password = generate_password_hash(password, method="sha256")
[docs] def check_password(self, password): """Check hashed password.""" return check_password_hash(self.password, password)
[docs] def modify(self, **kwargs): """Modify some or all modifiable properties.""" self.username = kwargs.get("username", self.username) self.role = kwargs.get("role", self.role) self.email = kwargs.get("email", self.email)
[docs] def to_dict(self): """Return user as a dictionary (public id).""" return { "id": self.id_public, "username": self.username, "role": self.role, "email": self.email, }
[docs] def get_active_sessions(self): """Get all active session containing this user.""" # TODO Can/should this be done more efficiently with a query? return list(filter(lambda s: s.is_active(), self.sessions))
@property def is_admin(self): return self.role == "admin"
[docs] @classmethod def by_id(cls, id_public): """Find user by public id.""" return cls.query.filter_by(id_public=id_public).first()
[docs] @classmethod def by_username(cls, username): """Find user by username.""" return cls.query.filter_by(username=username).first()
[docs] class Token(db.Model): __tablename__ = "token_blacklist" id = db.Column(db.Integer, primary_key=True) jti = db.Column(db.String(36), nullable=False) token_type = db.Column(db.String(10), nullable=False) user_identity = db.Column(db.String(50), nullable=False) revoked = db.Column(db.Boolean, nullable=False) expires = db.Column(db.DateTime, nullable=False)
[docs] def to_dict(self): """Return token as a dictionary (with private id).""" # TODO Use public id? return { "id": self.id, "jti": self.jti, "token_type": self.token_type, "user_identity": self.user_identity, "revoked": self.revoked, "expires": self.expires, }
[docs] class Device(db.Model): # Assigned automatically id = db.Column(db.Integer, primary_key=True) id_public = db.Column(db.String(36), unique=True, nullable=False) # Modifiable name = db.Column(db.String(30), unique=True, nullable=False) ipv4 = db.Column(db.String(15), nullable=False) # may be unnecessary # Modifiable via set_password() password = db.Column(db.String(200), nullable=False) def __init__(self, **kwargs): super().__init__(**kwargs) self.id_public = str(uuid.uuid4())
[docs] def set_password(self, password): """Create hashed password.""" self.password = generate_password_hash(password, method="sha256")
[docs] def check_password(self, password): """Check hashed password.""" return check_password_hash(self.password, password)
[docs] def modify(self, **kwargs): """Modify some or all modifiable properties.""" self.name = kwargs.get("name", self.name) self.ipv4 = kwargs.get("ipv4", self.ipv4)
[docs] def to_dict(self): """Return device as a dictionary (public id).""" return { "id": self.id_public, "name": self.name, "ipv4": self.ipv4, }
[docs] @classmethod def by_id(cls, id_public): """Find device by public id.""" return cls.query.filter_by(id_public=id_public).first()
[docs] @classmethod def by_name(cls, name): """Find device by name.""" return cls.query.filter_by(name=name).first()
rel_experiment_device = db.Table( "rel_experiment_device", db.Column("device_id", db.Integer, db.ForeignKey("device.id"), primary_key=True), db.Column("experiment_id", db.Integer, db.ForeignKey("experiment.id"), primary_key=True) )
[docs] class Experiment(db.Model): # Assigned automatically id = db.Column(db.Integer, primary_key=True) id_public = db.Column(db.String(36), unique=True, nullable=False) # Modifiable name = db.Column(db.String(30), unique=True, nullable=False) devices = db.relationship( "Device", secondary=rel_experiment_device, lazy="subquery", backref=db.backref("experiments", lazy=True), ) sessions = db.relationship( "Session", lazy="subquery", backref=db.backref("experiment", lazy=True), ) def __init__(self, **kwargs): super().__init__(**kwargs) self.id_public = str(uuid.uuid4())
[docs] def modify(self, **kwargs): """Modify some or all modifiable properties.""" self.name = kwargs.get("name", self.name) self.devices = kwargs.get("devices", self.devices)
[docs] def to_dict(self): """Return experiment as a dictionary (public id).""" return { "id": self.id_public, "name": self.name, "devices": [d.id_public for d in self.devices], "sessions": [s.id_public for s in self.sessions], }
[docs] @classmethod def by_id(cls, id_public): """Find experiment by public id.""" return cls.query.filter_by(id_public=id_public).first()
[docs] @classmethod def by_name(cls, name): """Find device by name.""" return cls.query.filter_by(name=name).first()
rel_session_user = db.Table( "rel_session_user", db.Column("user_id", db.Integer, db.ForeignKey("user.id"), primary_key=True), db.Column("session_id", db.Integer, db.ForeignKey("session.id"), primary_key=True) )
[docs] class Session(db.Model): # Assigned automatically id = db.Column(db.Integer, primary_key=True) id_public = db.Column(db.String(36), unique=True, nullable=False) # Modifiable name = db.Column(db.String(30)) begin = db.Column(db.Integer, nullable=False) end = db.Column(db.Integer, nullable=False) experiment_id = db.Column( db.Integer, db.ForeignKey('experiment.id'), nullable=False, ) users = db.relationship( "User", secondary=rel_session_user, lazy="subquery", backref=db.backref("sessions", lazy=True), ) def __init__(self, **kwargs): super().__init__(**kwargs) self.id_public = str(uuid.uuid4())
[docs] def is_active(self): return self.begin < int(time.time() * 1000) < self.end
[docs] def modify(self, **kwargs): """Modify some or all modifiable properties.""" self.name = kwargs.get("name", self.name) self.begin = kwargs.get("begin", self.begin) self.end = kwargs.get("end", self.end) self.experiment = kwargs.get("experiment", self.experiment) self.users = kwargs.get("users", self.users)
[docs] def to_dict(self): """Return session as a dictionary (public id).""" return { "id": self.id_public, "name": self.name, "begin": self.begin, "end": self.end, "experiment": self.experiment.id_public, "users": [u.id_public for u in self.users], }
[docs] @classmethod def by_id(cls, id_public): """Find session by public id.""" return cls.query.filter_by(id_public=id_public).first()
[docs] @classmethod def all_active(cls, user=None, device=None): """Get active sessions belonging to a user and/or device. If neither user nor device is specified, return all active sessions. :param user: server.models.User or None :param device: server.models.Device or None :return: set of server.models.Session """ active = cls.query.filter(db.and_( cls.begin < int(time.time() * 1000), int(time.time() * 1000) < cls.end )) if user: active = filter(lambda s: user in s.users, active) if device: active = filter(lambda s: device in s.devices, active) return set(active)
rel_usergroup_user = db.Table( "rel_usergroup_user", db.Column("user_id", db.Integer, db.ForeignKey("user.id"), primary_key=True), db.Column("usergroup_id", db.Integer, db.ForeignKey("usergroup.id"), primary_key=True) )
[docs] class Usergroup(db.Model): # Assigned automatically id = db.Column(db.Integer, primary_key=True) id_public = db.Column(db.String(36), unique=True, nullable=False) # Modifiable name = db.Column(db.String(30), unique=True, nullable=False) users = db.relationship( "User", secondary=rel_usergroup_user, lazy="subquery", backref=db.backref("usergroups", lazy=True), ) description = db.Column(db.Text) style_json = db.Column(db.Text) def __init__(self, **kwargs): super().__init__(**kwargs) self.id_public = str(uuid.uuid4())
[docs] def modify(self, **kwargs): """Modify some or all modifiable properties.""" self.name = kwargs.get("name", self.name) self.users = kwargs.get("users", self.users) self.description = kwargs.get("description", self.description) self.style_json = kwargs.get("style_json", self.style_json)
[docs] def to_dict(self): """Return usergroup as a dictionary (public id).""" return { "id": self.id_public, "name": self.name, "users": [u.id_public for u in self.users], "description": self.description, "style_json": self.style_json, }
[docs] @classmethod def by_id(cls, id_public): """Find usergroup by public id.""" return cls.query.filter_by(id_public=id_public).first()
[docs] @classmethod def by_name(cls, name): """Find usergroup by name.""" return cls.query.filter_by(name=name).first()