Source code for server.blacklist_helpers
from datetime import datetime
from sqlalchemy.orm.exc import NoResultFound
from flask import current_app
from flask_jwt_extended import decode_token
from .exceptions import TokenNotFound
from .models import Token
from . import db
def _epoch_utc_to_datetime(epoch_utc):
"""
Helper function for converting epoch timestamps (as stored in JWTs) into
python datetime objects (which are easier to use with sqlalchemy).
"""
return datetime.fromtimestamp(epoch_utc)
[docs]
def add_token_to_database(encoded_token):
"""
Add a new token to the database. It is not revoked when it is added.
"""
decoded_token = decode_token(encoded_token)
db_token = Token(
jti=decoded_token["jti"],
token_type=decoded_token["type"],
user_identity=decoded_token[current_app.config["JWT_IDENTITY_CLAIM"]],
expires=_epoch_utc_to_datetime(decoded_token["exp"]),
revoked=False,
)
db.session.add(db_token)
db.session.commit()
[docs]
def token_is_revoked(decoded_token):
"""
Check if token is revoked. If it does not exist in the database, consider it revoked.
"""
jti = decoded_token["jti"]
try:
token = Token.query.filter_by(jti=jti).one()
return token.revoked
except NoResultFound:
return True
[docs]
def get_user_tokens(user_identity):
"""
Return all tokens belonging to given user.
"""
return Token.query.filter_by(user_identity=user_identity).all()
[docs]
def revoke_token(token_id, user):
"""
Revoke token. If it does not exist in database, raise TokenNotFound.
"""
try:
token = Token.query.filter_by(id=token_id, user_identity=user).one()
token.revoked = True
db.session.commit()
except NoResultFound:
raise TokenNotFound("Could not find the token {}".format(token_id))
[docs]
def revoke_all_tokens(user_identity):
"""
Revoke all tokens belonging to given user.
"""
tokens = Token.query.filter_by(user_identity=user_identity).all()
for t in tokens:
t.revoked = True
db.session.commit()
[docs]
def unrevoke_token(token_id, user):
"""
Unrevoke given token. Raise TokenNotFound if it does not exist in the database.
"""
try:
token = Token.query.filter_by(id=token_id, user_identity=user).one()
token.revoked = False
db.session.commit()
except NoResultFound:
raise TokenNotFound("Could not find the token {}".format(token_id))
[docs]
def prune_database():
"""
Delete all expired tokens from the database.
"""
now = datetime.now()
expired = Token.query.filter(Token.expires < now).all()
for token in expired:
db.session.delete(token)
db.session.commit()