Source code for server.views_api.user

from flask import Blueprint, request, jsonify
from flask_socketio import disconnect
from flask_jwt_extended import get_jwt_identity

from .. import db, connections_users
from ..blacklist_helpers import revoke_all_tokens
from ..models import User
from ..functions import submit_user
from .validations import get_password_errors, get_metadata_errors
from ..protections import user_required, admin_required, content_json

import itertools

bp = Blueprint("user_api", __name__, url_prefix="/api")


[docs] @bp.route("/user", methods=["GET"]) @user_required # TODO: SECURITY: Maybe admin_required to view all users? def get_all_users(): users = User.query.all() return jsonify(users=[u.to_dict() for u in users]), 200
[docs] @bp.route("/user/<id_>", methods=["GET"]) @user_required # TODO: SECURITY: Maybe admin_required to view other-than-me? def get_user(id_): user = User.by_id(id_) if not user: return jsonify(message="User not found."), 404 return jsonify(user=user.to_dict()), 200
[docs] @bp.route("/user", methods=["POST"]) @admin_required @content_json(["username", "password", "role"]) def create_user(): data = request.get_json() metadata = { "username": data["username"], "role": data["role"], "email": data.get("email", ""), } password = data["password"] errors = dict(itertools.chain( get_metadata_errors(**metadata), get_password_errors(password) )) if User.by_username(data["username"]): errors["username"] = "Username already taken." if not errors: user = submit_user(password=password, **metadata) return jsonify(message="User '{}' created.".format(user.username), user=user.to_dict()), 201 else: return jsonify(message="User creation failed.", errors=errors), 400
[docs] @bp.route("/user/<id_>/password", methods=["PUT"]) @user_required @content_json(["old_password", "new_password"]) def change_password(id_): if id_ != get_jwt_identity(): # TODO: Maybe make this a decorator? return jsonify(message="Invalid jwt identity."), 401 user = User.by_id(id_) if user is None: return jsonify(message="User not found."), 404 data = request.get_json() if not user.check_password(data["old_password"]): return jsonify(message="Old password does not match!"), 401 errors = dict(get_password_errors(data["new_password"])) if not errors: user.set_password(data["new_password"]) db.session.commit() return jsonify(message="Password changed successfully."), 200 else: return jsonify(message="Password change failed.", errors=errors), 400
[docs] @bp.route("/user/<id_>", methods=["DELETE"]) @admin_required def delete_user(id_): if id_ == get_jwt_identity(): return jsonify(message="Cannot delete yourself."), 403 user = User.by_id(id_) if not user: return jsonify(message="User not found."), 404 db.session.delete(user) db.session.commit() revoke_all_tokens(id_) for sid in connections_users.by_pubid.get(id_, []): disconnect(sid, namespace="/client") return jsonify(message="User '{}' has been deleted.".format(user.username)), 200
[docs] @bp.route("/user/<id_>", methods=["PATCH"]) @admin_required @content_json([]) def modify_user(id_): user = User.by_id(id_) if user is None: return jsonify(message="User not found."), 404 data = request.get_json() data = { "username": data.get("username", user.username), "role": data.get("role", user.role), "email": data.get("email", user.email), } errors = dict(get_metadata_errors(**data)) if data["username"] != user.username and User.by_username(data["username"]): errors["username"] = "Username already taken." if not errors: submit_user(password=None, **data, replace_user=user) return jsonify(message="User '{}' modified.".format(user.username)), 200 else: return jsonify(message="User modification failed.", errors=errors), 400