from flask import Blueprint, jsonify, request, current_app
from flask_jwt_extended import (
create_access_token, create_refresh_token, get_jwt_identity, jwt_required,
set_refresh_cookies, set_access_cookies
)
from .. import jwt
from ..protections import content_json
from ..models import User
from ..blacklist_helpers import (token_is_revoked, add_token_to_database, get_user_tokens,
revoke_token, unrevoke_token, revoke_all_tokens)
from ..proxy_helpers import real_remote_addr
from ..exceptions import TokenNotFound
bp = Blueprint("views_login", __name__, url_prefix="/auth")
[docs]
@bp.route("/login", methods=["POST"])
@content_json(["username", "password"])
def login():
data = request.get_json()
ip = real_remote_addr()
user = User.query.filter_by(username=data["username"]).first()
if not user:
current_app.logger.warn("client ?/%s failed login attempt from %s",
data["username"], ip)
return jsonify(errors={"username": "User not found."}), 404
if not user.check_password(data["password"]):
current_app.logger.warn("client %s/%s failed login attempt from %s",
user.id_public, user.username, ip)
return jsonify(errors={"password": "Invalid password."}), 401
access_token = create_access_token(identity=user.id_public)
refresh_token = create_refresh_token(identity=user.id_public)
add_token_to_database(access_token)
add_token_to_database(refresh_token)
response = jsonify(
user=user.to_dict(),
token_expiry=current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].seconds,
token=access_token,
)
set_refresh_cookies(response, refresh_token)
set_access_cookies(response, access_token)
current_app.logger.info("client %s/%s logged in from %s",
user.id_public, user.username, ip)
return response, 200
[docs]
@bp.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
def refresh():
user = User.query.filter_by(id_public=get_jwt_identity()).first()
access_token = create_access_token(identity=user.id_public)
response = jsonify(
user=user.to_dict(),
token_expiry=current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].seconds,
token=access_token
)
add_token_to_database(access_token)
set_access_cookies(response, access_token)
current_app.logger.info("client %s/%s refreshed token from %s",
user.id_public, user.username, real_remote_addr())
return response, 200
[docs]
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
return token_is_revoked(jwt_payload)
[docs]
@bp.route("/token", methods=["GET"])
@jwt_required()
def get_tokens():
id_publicentity = get_jwt_identity()
all_tokens = get_user_tokens(id_publicentity)
ret = [token.to_dict() for token in all_tokens]
return jsonify(ret), 200
[docs]
@bp.route("/token/<token_id>", methods=["PUT"])
@jwt_required()
@content_json(["revoke"])
def modify_token(token_id):
return jsonify(message="Endpoint needs security revision"), 501
# TODO It does not seem ok to allow things like "unrevoking refresh token"
# based only on access token authorization. Probably ditch the "unrevoke"
# action completely, because it makes no (security) sense anyway.
revoke = request.get_json()["revoke"]
if not isinstance(revoke, bool):
return jsonify(message="'revoke' must be a boolean"), 400
# Revoke or unrevoke the token based on what was passed to this function
id_public = get_jwt_identity()
try:
if revoke:
revoke_token(token_id, id_public)
return jsonify(message="Token revoked"), 200
else:
unrevoke_token(token_id, id_public)
return jsonify(message="Token unrevoked"), 200
except TokenNotFound:
return jsonify(message="The specified token was not found"), 404
[docs]
@bp.route("/logout")
@jwt_required()
def logout():
revoke_all_tokens(get_jwt_identity())
current_app.logger.info("client %s logged out from %s",
get_jwt_identity(), real_remote_addr())
return jsonify(message="Logout successful."), 200