Source code for server.views_main.auth

from flask import Blueprint, jsonify, request, current_app
from flask_jwt_extended import (
    create_access_token, create_refresh_token, get_jwt_identity, jwt_required,
    set_refresh_cookies, set_access_cookies
)

from .. import jwt
from ..protections import content_json
from ..models import User
from ..blacklist_helpers import (token_is_revoked, add_token_to_database, get_user_tokens,
                                revoke_token, unrevoke_token, revoke_all_tokens)
from ..proxy_helpers import real_remote_addr
from ..exceptions import TokenNotFound

bp = Blueprint("views_login", __name__, url_prefix="/auth")


[docs] @bp.route("/login", methods=["POST"]) @content_json(["username", "password"]) def login(): data = request.get_json() ip = real_remote_addr() user = User.query.filter_by(username=data["username"]).first() if not user: current_app.logger.warn("client ?/%s failed login attempt from %s", data["username"], ip) return jsonify(errors={"username": "User not found."}), 404 if not user.check_password(data["password"]): current_app.logger.warn("client %s/%s failed login attempt from %s", user.id_public, user.username, ip) return jsonify(errors={"password": "Invalid password."}), 401 access_token = create_access_token(identity=user.id_public) refresh_token = create_refresh_token(identity=user.id_public) add_token_to_database(access_token) add_token_to_database(refresh_token) response = jsonify( user=user.to_dict(), token_expiry=current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].seconds, token=access_token, ) set_refresh_cookies(response, refresh_token) set_access_cookies(response, access_token) current_app.logger.info("client %s/%s logged in from %s", user.id_public, user.username, ip) return response, 200
[docs] @bp.route("/refresh", methods=["POST"]) @jwt_required(refresh=True) def refresh(): user = User.query.filter_by(id_public=get_jwt_identity()).first() access_token = create_access_token(identity=user.id_public) response = jsonify( user=user.to_dict(), token_expiry=current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].seconds, token=access_token ) add_token_to_database(access_token) set_access_cookies(response, access_token) current_app.logger.info("client %s/%s refreshed token from %s", user.id_public, user.username, real_remote_addr()) return response, 200
[docs] @jwt.token_in_blocklist_loader def check_if_token_revoked(jwt_header, jwt_payload): return token_is_revoked(jwt_payload)
[docs] @bp.route("/token", methods=["GET"]) @jwt_required() def get_tokens(): id_publicentity = get_jwt_identity() all_tokens = get_user_tokens(id_publicentity) ret = [token.to_dict() for token in all_tokens] return jsonify(ret), 200
[docs] @bp.route("/token/<token_id>", methods=["PUT"]) @jwt_required() @content_json(["revoke"]) def modify_token(token_id): return jsonify(message="Endpoint needs security revision"), 501 # TODO It does not seem ok to allow things like "unrevoking refresh token" # based only on access token authorization. Probably ditch the "unrevoke" # action completely, because it makes no (security) sense anyway. revoke = request.get_json()["revoke"] if not isinstance(revoke, bool): return jsonify(message="'revoke' must be a boolean"), 400 # Revoke or unrevoke the token based on what was passed to this function id_public = get_jwt_identity() try: if revoke: revoke_token(token_id, id_public) return jsonify(message="Token revoked"), 200 else: unrevoke_token(token_id, id_public) return jsonify(message="Token unrevoked"), 200 except TokenNotFound: return jsonify(message="The specified token was not found"), 404
[docs] @bp.route("/logout") @jwt_required() def logout(): revoke_all_tokens(get_jwt_identity()) current_app.logger.info("client %s logged out from %s", get_jwt_identity(), real_remote_addr()) return jsonify(message="Logout successful."), 200