digitaldemocratic/dd-sso/admin/src/admin/views/ApiViews.py

478 lines
18 KiB
Python

#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
# DD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# DD is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import copy
import json
import logging as log
import os
import traceback
from operator import itemgetter
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional
from flasgger import Swagger
from flasgger.utils import swag_from
from flask import request
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from ..lib.api_exceptions import Error
from .decorators import OptionalJsonResponse, has_token
ERR_500 = (
json.dumps({"msg": "Internal server error", "code": 500, "error": True}),
500,
{"Content-Type": "application/json"},
)
ERR_501 = (
json.dumps({"msg": "Not implemented yet", "code": 501, "error": True}),
501,
{"Content-Type": "application/json"},
)
ERR_400 = (
json.dumps({"msg": "Bad request", "code": 400, "error": True}),
400,
{"Content-Type": "application/json"},
)
ERR_404 = (
json.dumps({"msg": "Not found", "code": 404, "error": True}),
404,
{"Content-Type": "application/json"},
)
ERR_409 = (
json.dumps({"msg": "Conflict", "code": 409, "error": True}),
409,
{"Content-Type": "application/json"},
)
ERR_412 = (
json.dumps({"msg": "Precondition failed", "code": 412, "error": True}),
412,
{"Content-Type": "application/json"},
)
def setup_api_views(app: "AdminFlaskApp") -> None:
swagger = Swagger(
app,
template={
"info": {
"title": "DD API NotaBLE",
"description":
"""NotaBLE és la col·laboració entre Gwido i el Workspace educatiu DD.
És un projecte de Xnet, IsardVDI, Gwido i Taller de Músics, guanyador de la Ciutat Proactiva 2021, suport a la innovació urbana de la Fundació BitHabitat.""",
"version": "2022.11.0",
"termsOfService": "",
},
"externalDocs": {
"description": "Online Documentation",
"url": "https://dd.digitalitzacio-democratica.xnet-x.net/docs/integrations.ca/",
},
"securityDefinitions": {
"dd_jwt": {
"type": "apiKey",
"in": "header",
"name": "Authorization",
"description": "JWS token using API_SECRET (e.g. 'bearer X.Y')",
}
},
"security": {"dd_jwt": {"$ref": "#/securityDefinitions/dd_jwt"}},
"swagger_ui": bool(os.environ.get("SWAGGER_UI", "")),
},
)
# LISTS
@app.json_route("/ddapi/users", methods=["GET"], endpoint="api_users")
@swag_from("api_docs/users.yml", endpoint="api_users")
@has_token
def ddapi_users() -> OptionalJsonResponse:
try:
if request.method == "GET":
sorted_users = sorted(
app.admin.get_mix_users(), key=itemgetter("username")
)
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route(
"/ddapi/users/filter", methods=["POST"], endpoint="api_users_filter"
)
@swag_from("api_docs/users_filter.yml", endpoint="api_users_filter")
@has_token
def ddapi_users_search() -> OptionalJsonResponse:
try:
if request.method == "POST":
try:
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
except Exception:
return ERR_400
users = app.admin.get_mix_users()
result = [
user_parser(user) for user in filter_users(users, data["text"])
]
sorted_result = sorted(result, key=itemgetter("id"))
return (
json.dumps(sorted_result),
200,
{"Content-Type": "application/json"},
)
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/groups", methods=["GET"], endpoint="api_groups")
@swag_from("api_docs/groups.yml", endpoint="api_groups")
@has_token
def ddapi_groups() -> OptionalJsonResponse:
try:
if request.method == "GET":
sorted_groups = sorted(
app.admin.get_mix_groups(), key=itemgetter("name")
)
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/roles", methods=["GET"], endpoint="api_roles")
@swag_from("api_docs/roles.yml", endpoint="api_roles")
@has_token
def ddapi_roles() -> OptionalJsonResponse:
try:
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=itemgetter("name")):
roles.append(role_parser(role))
return json.dumps(roles), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/role/users", methods=["POST"], endpoint="api_role_users")
@swag_from("api_docs/role_users.yml", endpoint="api_role_users")
@has_token
def ddapi_role_users() -> OptionalJsonResponse:
try:
if request.method == "POST":
try:
data = request.get_json(force=True)
assert isinstance(data, dict)
except Exception:
return ERR_400
sorted_users = sorted(
app.admin.get_mix_users(), key=itemgetter("username")
)
role: str = data.get("id", "")
if not role:
role = data.get("name", "")
if not role:
role = data.get("keycloak_id", "")
if role:
role_users = [
user_parser(user)
for user in sorted_users
if role in user["roles"]
]
else:
return ERR_400
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
# INDIVIDUAL ACTIONS
@app.json_route("/ddapi/user", methods=["POST"], endpoint="api_user_new")
@app.json_route(
"/ddapi/user/<user_ddid>",
methods=["PUT", "GET", "DELETE"],
endpoint="api_user_ddid",
)
@swag_from("api_docs/user_new.yml", endpoint="api_user_new")
@swag_from("api_docs/user_get.yml", endpoint="api_user_ddid", methods=["GET"])
@swag_from("api_docs/user_put.yml", endpoint="api_user_ddid", methods=["PUT"])
@swag_from("api_docs/user_delete.yml", endpoint="api_user_ddid", methods=["DELETE"])
@has_token
def ddapi_user(user_ddid: Optional[str] = None) -> OptionalJsonResponse:
try:
uid: str = user_ddid if user_ddid else ""
if request.method == "POST":
if uid:
return ERR_400
try:
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
log.debug(
"Data validation for user failed: "
+ str(app.validators["user"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
if app.admin.get_user_username(data["username"]):
return ERR_409
data = app.validators["user"].normalized(data)
try:
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
# Group does not exist already
return ERR_412
except Error as e:
if e.error.get("error") == "conflict":
# It already exists
return ERR_409
log.error(traceback.format_exc())
return ERR_500
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
if request.method == "GET":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
return (
json.dumps(user_parser(user)),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "PUT":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
try:
data = request.get_json(force=True)
if not data or not app.validators["user_update"].validate(data):
log.debug(
"Data validation for user failed: "
+ str(app.validators["user_update"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
data = app.validators["user_update"].normalized(data)
# Work with a secure copy
u = copy.deepcopy(user)
u.update({"username": uid})
# Update object from API-provided data
for p in ["email", "quota", "enabled", "groups"]:
if p in data:
u[p] = data[p]
for lp, rp in [("firstname", "first"), ("lastname", "last")]:
if rp in data:
u[lp] = data[rp]
# And role
if "role" in data:
u["roles"] = [data["role"]]
app.admin.user_update(u)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/username/<old_user_ddid>/<new_user_did>", methods=["PUT"])
@has_token
def ddapi_username(old_user_ddid: str, new_user_did: str) -> OptionalJsonResponse:
try:
user = app.admin.get_user_username(old_user_ddid)
if not user:
return ERR_404
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return ERR_501
except Exception:
log.error(traceback.format_exc())
return ERR_500
@app.json_route("/ddapi/group", methods=["POST"], endpoint="api_group_new")
@app.json_route(
"/ddapi/group/<group_id>",
methods=["GET", "POST", "DELETE"],
endpoint="api_group_group_id",
)
@swag_from("api_docs/group_new.yml", endpoint="api_group_new")
@swag_from("api_docs/group_get.yml", endpoint="api_group_group_id", methods=["GET"])
# @swag_from('api_docs/group_put.yml', endpoint='api_group_group_id', methods=["PUT"])
@swag_from(
"api_docs/group_delete.yml", endpoint="api_group_group_id", methods=["DELETE"]
)
# @app.json_route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_group(group_id: Optional[str] = None) -> OptionalJsonResponse:
try:
uid: str = group_id if group_id else ""
# /ddapi/group
if request.method == "POST":
try:
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
log.debug(
"Data validation for group failed: "
+ str(app.validators["group"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(uid):
return ERR_409
app.admin.add_group(data)
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
# /ddapi/group/<group_id>
if request.method == "GET":
group = app.admin.get_group_by_name(uid)
if not group:
return ERR_404
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(uid)
if not group:
return ERR_404
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/user_mail", methods=["POST"])
@app.json_route("/ddapi/user_mail/<id>", methods=["GET", "DELETE"])
@has_token
def ddapi_user_mail(id: Optional[str] = None) -> OptionalJsonResponse:
try:
# TODO: Remove this endpoint when we ensure there are no consumers
if request.method in ["GET", "DELETE"]:
return ERR_501
if request.method == "POST":
try:
data = request.get_json(force=True)
assert isinstance(data, list) and data
for user in data:
if not app.validators["mail"].validate(user):
log.debug(
"Data validation for mail failed: "
+ str(app.validators["mail"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
for user in data:
log.info("Added user email")
app.admin.nextcloud_mail_set([user], dict())
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
def role_parser(role: Dict[str, str]) -> Dict[str, Any]:
return {
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
# TODO: After this line, this is all mostly duplicated from other places...
def user_parser(user: Dict[str, Any]) -> Dict[str, Any]:
return {
"keycloak_id": user["id"],
"id": user["username"],
"username": user["username"],
"enabled": user["enabled"],
"first": user["first"],
"last": user["last"],
"role": user["roles"][0] if len(user["roles"]) else None,
"email": user["email"],
"groups": user.get("groups", user["keycloak_groups"]),
"quota": user["quota"],
"quota_used_bytes": user["quota_used_bytes"],
}
def group_parser(group: Dict[str, str]) -> Dict[str, Any]:
return {
"keycloak_id": group["id"],
"id": group["name"],
"name": group["name"].split(".")[-1],
"path": group["path"],
"description": group.get("description", ""),
}
def filter_users(users: Iterable[Dict[str, Any]], text: str) -> List[Dict[str, Any]]:
return [
user
for user in users
if text in user["username"]
or text in user["first"]
or text in user["last"]
or text in user["email"]
]