Source code for server.blacklist_helpers

from datetime import datetime

from sqlalchemy.orm.exc import NoResultFound
from flask import current_app
from flask_jwt_extended import decode_token

from .exceptions import TokenNotFound
from .models import Token
from . import db


def _epoch_utc_to_datetime(epoch_utc):
    """
    Helper function for converting epoch timestamps (as stored in JWTs) into
    python datetime objects (which are easier to use with sqlalchemy).
    """
    return datetime.fromtimestamp(epoch_utc)


[docs] def add_token_to_database(encoded_token): """ Add a new token to the database. It is not revoked when it is added. """ decoded_token = decode_token(encoded_token) db_token = Token( jti=decoded_token["jti"], token_type=decoded_token["type"], user_identity=decoded_token[current_app.config["JWT_IDENTITY_CLAIM"]], expires=_epoch_utc_to_datetime(decoded_token["exp"]), revoked=False, ) db.session.add(db_token) db.session.commit()
[docs] def token_is_revoked(decoded_token): """ Check if token is revoked. If it does not exist in the database, consider it revoked. """ jti = decoded_token["jti"] try: token = Token.query.filter_by(jti=jti).one() return token.revoked except NoResultFound: return True
[docs] def get_user_tokens(user_identity): """ Return all tokens belonging to given user. """ return Token.query.filter_by(user_identity=user_identity).all()
[docs] def revoke_token(token_id, user): """ Revoke token. If it does not exist in database, raise TokenNotFound. """ try: token = Token.query.filter_by(id=token_id, user_identity=user).one() token.revoked = True db.session.commit() except NoResultFound: raise TokenNotFound("Could not find the token {}".format(token_id))
[docs] def revoke_all_tokens(user_identity): """ Revoke all tokens belonging to given user. """ tokens = Token.query.filter_by(user_identity=user_identity).all() for t in tokens: t.revoked = True db.session.commit()
[docs] def unrevoke_token(token_id, user): """ Unrevoke given token. Raise TokenNotFound if it does not exist in the database. """ try: token = Token.query.filter_by(id=token_id, user_identity=user).one() token.revoked = False db.session.commit() except NoResultFound: raise TokenNotFound("Could not find the token {}".format(token_id))
[docs] def prune_database(): """ Delete all expired tokens from the database. """ now = datetime.now() expired = Token.query.filter(Token.expires < now).all() for token in expired: db.session.delete(token) db.session.commit()