from flask import Blueprint, request, jsonify
from flask_socketio import disconnect
from flask_jwt_extended import get_jwt_identity
from .. import db, connections_users
from ..blacklist_helpers import revoke_all_tokens
from ..models import User
from ..functions import submit_user
from .validations import get_password_errors, get_metadata_errors
from ..protections import user_required, admin_required, content_json
import itertools
bp = Blueprint("user_api", __name__, url_prefix="/api")
[docs]
@bp.route("/user", methods=["GET"])
@user_required # TODO: SECURITY: Maybe admin_required to view all users?
def get_all_users():
users = User.query.all()
return jsonify(users=[u.to_dict() for u in users]), 200
[docs]
@bp.route("/user/<id_>", methods=["GET"])
@user_required # TODO: SECURITY: Maybe admin_required to view other-than-me?
def get_user(id_):
user = User.by_id(id_)
if not user:
return jsonify(message="User not found."), 404
return jsonify(user=user.to_dict()), 200
[docs]
@bp.route("/user", methods=["POST"])
@admin_required
@content_json(["username", "password", "role"])
def create_user():
data = request.get_json()
metadata = {
"username": data["username"],
"role": data["role"],
"email": data.get("email", ""),
}
password = data["password"]
errors = dict(itertools.chain(
get_metadata_errors(**metadata),
get_password_errors(password)
))
if User.by_username(data["username"]):
errors["username"] = "Username already taken."
if not errors:
user = submit_user(password=password, **metadata)
return jsonify(message="User '{}' created.".format(user.username),
user=user.to_dict()), 201
else:
return jsonify(message="User creation failed.", errors=errors), 400
[docs]
@bp.route("/user/<id_>/password", methods=["PUT"])
@user_required
@content_json(["old_password", "new_password"])
def change_password(id_):
if id_ != get_jwt_identity(): # TODO: Maybe make this a decorator?
return jsonify(message="Invalid jwt identity."), 401
user = User.by_id(id_)
if user is None:
return jsonify(message="User not found."), 404
data = request.get_json()
if not user.check_password(data["old_password"]):
return jsonify(message="Old password does not match!"), 401
errors = dict(get_password_errors(data["new_password"]))
if not errors:
user.set_password(data["new_password"])
db.session.commit()
return jsonify(message="Password changed successfully."), 200
else:
return jsonify(message="Password change failed.", errors=errors), 400
[docs]
@bp.route("/user/<id_>", methods=["DELETE"])
@admin_required
def delete_user(id_):
if id_ == get_jwt_identity():
return jsonify(message="Cannot delete yourself."), 403
user = User.by_id(id_)
if not user:
return jsonify(message="User not found."), 404
db.session.delete(user)
db.session.commit()
revoke_all_tokens(id_)
for sid in connections_users.by_pubid.get(id_, []):
disconnect(sid, namespace="/client")
return jsonify(message="User '{}' has been deleted.".format(user.username)), 200
[docs]
@bp.route("/user/<id_>", methods=["PATCH"])
@admin_required
@content_json([])
def modify_user(id_):
user = User.by_id(id_)
if user is None:
return jsonify(message="User not found."), 404
data = request.get_json()
data = {
"username": data.get("username", user.username),
"role": data.get("role", user.role),
"email": data.get("email", user.email),
}
errors = dict(get_metadata_errors(**data))
if data["username"] != user.username and User.by_username(data["username"]):
errors["username"] = "Username already taken."
if not errors:
submit_user(password=None, **data, replace_user=user)
return jsonify(message="User '{}' modified.".format(user.username)), 200
else:
return jsonify(message="User modification failed.", errors=errors), 400