import time
import uuid
from werkzeug.security import generate_password_hash, check_password_hash
from . import db
roles = {"admin", "user"} # valid roles
[docs]
class User(db.Model):
# Assigned automatically
id = db.Column(db.Integer, primary_key=True)
id_public = db.Column(db.String(36), unique=True, nullable=False)
# Modifiable
username = db.Column(db.String(30), unique=True, nullable=False)
role = db.Column(db.String(20), nullable=False)
email = db.Column(db.String(50))
# Modifiable via set_password()
password = db.Column(db.String(200), nullable=False)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.id_public = str(uuid.uuid4())
[docs]
def set_password(self, password):
"""Create hashed password."""
self.password = generate_password_hash(password, method="sha256")
[docs]
def check_password(self, password):
"""Check hashed password."""
return check_password_hash(self.password, password)
[docs]
def modify(self, **kwargs):
"""Modify some or all modifiable properties."""
self.username = kwargs.get("username", self.username)
self.role = kwargs.get("role", self.role)
self.email = kwargs.get("email", self.email)
[docs]
def to_dict(self):
"""Return user as a dictionary (public id)."""
return {
"id": self.id_public,
"username": self.username,
"role": self.role,
"email": self.email,
}
[docs]
def get_active_sessions(self):
"""Get all active session containing this user."""
# TODO Can/should this be done more efficiently with a query?
return list(filter(lambda s: s.is_active(), self.sessions))
@property
def is_admin(self):
return self.role == "admin"
[docs]
@classmethod
def by_id(cls, id_public):
"""Find user by public id."""
return cls.query.filter_by(id_public=id_public).first()
[docs]
@classmethod
def by_username(cls, username):
"""Find user by username."""
return cls.query.filter_by(username=username).first()
[docs]
class Token(db.Model):
__tablename__ = "token_blacklist"
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(36), nullable=False)
token_type = db.Column(db.String(10), nullable=False)
user_identity = db.Column(db.String(50), nullable=False)
revoked = db.Column(db.Boolean, nullable=False)
expires = db.Column(db.DateTime, nullable=False)
[docs]
def to_dict(self):
"""Return token as a dictionary (with private id).""" # TODO Use public id?
return {
"id": self.id,
"jti": self.jti,
"token_type": self.token_type,
"user_identity": self.user_identity,
"revoked": self.revoked,
"expires": self.expires,
}
[docs]
class Device(db.Model):
# Assigned automatically
id = db.Column(db.Integer, primary_key=True)
id_public = db.Column(db.String(36), unique=True, nullable=False)
# Modifiable
name = db.Column(db.String(30), unique=True, nullable=False)
ipv4 = db.Column(db.String(15), nullable=False) # may be unnecessary
# Modifiable via set_password()
password = db.Column(db.String(200), nullable=False)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.id_public = str(uuid.uuid4())
[docs]
def set_password(self, password):
"""Create hashed password."""
self.password = generate_password_hash(password, method="sha256")
[docs]
def check_password(self, password):
"""Check hashed password."""
return check_password_hash(self.password, password)
[docs]
def modify(self, **kwargs):
"""Modify some or all modifiable properties."""
self.name = kwargs.get("name", self.name)
self.ipv4 = kwargs.get("ipv4", self.ipv4)
[docs]
def to_dict(self):
"""Return device as a dictionary (public id)."""
return {
"id": self.id_public,
"name": self.name,
"ipv4": self.ipv4,
}
[docs]
@classmethod
def by_id(cls, id_public):
"""Find device by public id."""
return cls.query.filter_by(id_public=id_public).first()
[docs]
@classmethod
def by_name(cls, name):
"""Find device by name."""
return cls.query.filter_by(name=name).first()
rel_experiment_device = db.Table(
"rel_experiment_device",
db.Column("device_id", db.Integer, db.ForeignKey("device.id"), primary_key=True),
db.Column("experiment_id", db.Integer, db.ForeignKey("experiment.id"), primary_key=True)
)
[docs]
class Experiment(db.Model):
# Assigned automatically
id = db.Column(db.Integer, primary_key=True)
id_public = db.Column(db.String(36), unique=True, nullable=False)
# Modifiable
name = db.Column(db.String(30), unique=True, nullable=False)
devices = db.relationship(
"Device",
secondary=rel_experiment_device,
lazy="subquery",
backref=db.backref("experiments", lazy=True),
)
sessions = db.relationship(
"Session",
lazy="subquery",
backref=db.backref("experiment", lazy=True),
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.id_public = str(uuid.uuid4())
[docs]
def modify(self, **kwargs):
"""Modify some or all modifiable properties."""
self.name = kwargs.get("name", self.name)
self.devices = kwargs.get("devices", self.devices)
[docs]
def to_dict(self):
"""Return experiment as a dictionary (public id)."""
return {
"id": self.id_public,
"name": self.name,
"devices": [d.id_public for d in self.devices],
"sessions": [s.id_public for s in self.sessions],
}
[docs]
@classmethod
def by_id(cls, id_public):
"""Find experiment by public id."""
return cls.query.filter_by(id_public=id_public).first()
[docs]
@classmethod
def by_name(cls, name):
"""Find device by name."""
return cls.query.filter_by(name=name).first()
rel_session_user = db.Table(
"rel_session_user",
db.Column("user_id", db.Integer, db.ForeignKey("user.id"), primary_key=True),
db.Column("session_id", db.Integer, db.ForeignKey("session.id"), primary_key=True)
)
[docs]
class Session(db.Model):
# Assigned automatically
id = db.Column(db.Integer, primary_key=True)
id_public = db.Column(db.String(36), unique=True, nullable=False)
# Modifiable
name = db.Column(db.String(30))
begin = db.Column(db.Integer, nullable=False)
end = db.Column(db.Integer, nullable=False)
experiment_id = db.Column(
db.Integer,
db.ForeignKey('experiment.id'),
nullable=False,
)
users = db.relationship(
"User",
secondary=rel_session_user,
lazy="subquery",
backref=db.backref("sessions", lazy=True),
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.id_public = str(uuid.uuid4())
[docs]
def is_active(self):
return self.begin < int(time.time() * 1000) < self.end
[docs]
def modify(self, **kwargs):
"""Modify some or all modifiable properties."""
self.name = kwargs.get("name", self.name)
self.begin = kwargs.get("begin", self.begin)
self.end = kwargs.get("end", self.end)
self.experiment = kwargs.get("experiment", self.experiment)
self.users = kwargs.get("users", self.users)
[docs]
def to_dict(self):
"""Return session as a dictionary (public id)."""
return {
"id": self.id_public,
"name": self.name,
"begin": self.begin,
"end": self.end,
"experiment": self.experiment.id_public,
"users": [u.id_public for u in self.users],
}
[docs]
@classmethod
def by_id(cls, id_public):
"""Find session by public id."""
return cls.query.filter_by(id_public=id_public).first()
[docs]
@classmethod
def all_active(cls, user=None, device=None):
"""Get active sessions belonging to a user and/or device. If neither
user nor device is specified, return all active sessions.
:param user: server.models.User or None
:param device: server.models.Device or None
:return: set of server.models.Session
"""
active = cls.query.filter(db.and_(
cls.begin < int(time.time() * 1000),
int(time.time() * 1000) < cls.end
))
if user:
active = filter(lambda s: user in s.users, active)
if device:
active = filter(lambda s: device in s.devices, active)
return set(active)
rel_usergroup_user = db.Table(
"rel_usergroup_user",
db.Column("user_id", db.Integer, db.ForeignKey("user.id"), primary_key=True),
db.Column("usergroup_id", db.Integer, db.ForeignKey("usergroup.id"), primary_key=True)
)
[docs]
class Usergroup(db.Model):
# Assigned automatically
id = db.Column(db.Integer, primary_key=True)
id_public = db.Column(db.String(36), unique=True, nullable=False)
# Modifiable
name = db.Column(db.String(30), unique=True, nullable=False)
users = db.relationship(
"User",
secondary=rel_usergroup_user,
lazy="subquery",
backref=db.backref("usergroups", lazy=True),
)
description = db.Column(db.Text)
style_json = db.Column(db.Text)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.id_public = str(uuid.uuid4())
[docs]
def modify(self, **kwargs):
"""Modify some or all modifiable properties."""
self.name = kwargs.get("name", self.name)
self.users = kwargs.get("users", self.users)
self.description = kwargs.get("description", self.description)
self.style_json = kwargs.get("style_json", self.style_json)
[docs]
def to_dict(self):
"""Return usergroup as a dictionary (public id)."""
return {
"id": self.id_public,
"name": self.name,
"users": [u.id_public for u in self.users],
"description": self.description,
"style_json": self.style_json,
}
[docs]
@classmethod
def by_id(cls, id_public):
"""Find usergroup by public id."""
return cls.query.filter_by(id_public=id_public).first()
[docs]
@classmethod
def by_name(cls, name):
"""Find usergroup by name."""
return cls.query.filter_by(name=name).first()