[dd-sso] Add tests and refactor API

These tests can be executed with:
python -m unittest discover -s admin.views.test
merge-requests/55/head
Evilham 2022-12-11 14:00:47 +01:00
parent 579af2b31c
commit 10e6afe351
No known key found for this signature in database
GPG Key ID: AE3EE30D970886BF
7 changed files with 1559 additions and 543 deletions

View File

@ -32,6 +32,8 @@ types-psycopg2 = "*"
types-pyyaml = "*"
types-python-jose = "*"
types-pillow = "*"
flask-unittest = "*"
flake8 = "*"
[requires]
python_version = "3.8"

File diff suppressed because it is too large Load Diff

5
dd-sso/admin/setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[flake8]
profile = "black"
max-line-length = 88
extend-ignore = E203
statistics = True

View File

@ -18,319 +18,373 @@
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import copy
import json
import logging as log
from operator import itemgetter
import os
import socket
import sys
import time
import traceback
from operator import itemgetter
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional
from flask import request
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from ..lib.api_exceptions import Error
from .decorators import has_token, OptionalJsonResponse
from .decorators import OptionalJsonResponse, has_token
ERR_500 = (
json.dumps({"msg": "Internal server error", "code": 500, "error": True}),
500,
{"Content-Type": "application/json"},
)
ERR_501 = (
json.dumps({"msg": "Not implemented yet", "code": 501, "error": True}),
501,
{"Content-Type": "application/json"},
)
ERR_400 = (
json.dumps({"msg": "Bad request", "code": 400, "error": True}),
400,
{"Content-Type": "application/json"},
)
ERR_404 = (
json.dumps({"msg": "Not found", "code": 404, "error": True}),
404,
{"Content-Type": "application/json"},
)
ERR_409 = (
json.dumps({"msg": "Conflict", "code": 409, "error": True}),
409,
{"Content-Type": "application/json"},
)
ERR_412 = (
json.dumps({"msg": "Precondition failed", "code": 412, "error": True}),
412,
{"Content-Type": "application/json"},
)
def setup_api_views(app : "AdminFlaskApp") -> None:
## LISTS
def setup_api_views(app: "AdminFlaskApp") -> None:
# LISTS
@app.json_route("/ddapi/users", methods=["GET"])
@has_token
def ddapi_users() -> OptionalJsonResponse:
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
try:
if request.method == "GET":
sorted_users = sorted(
app.admin.get_mix_users(), key=itemgetter("username")
)
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/users/filter", methods=["POST"])
@has_token
def ddapi_users_search() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
sorted_result = sorted(result, key=itemgetter("id"))
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
try:
if request.method == "POST":
try:
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
except Exception:
return ERR_400
users = app.admin.get_mix_users()
result = [
user_parser(user) for user in filter_users(users, data["text"])
]
sorted_result = sorted(result, key=itemgetter("id"))
return (
json.dumps(sorted_result),
200,
{"Content-Type": "application/json"},
)
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/groups", methods=["GET"])
@has_token
def ddapi_groups() -> OptionalJsonResponse:
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=itemgetter("name"))
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/ddapi/group/users", methods=["POST"])
@has_token
def ddapi_group_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
if data.get("id"):
group_users = [
user_parser(user)
for user in sorted_users
if data.get("id") in user["keycloak_groups"]
]
elif data.get("path"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["path"] == data.get("path")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group path not found in system")
elif data.get("keycloak_id"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["id"] == data.get("keycloak_id")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(group_users), 200, {"Content-Type": "application/json"}
try:
if request.method == "GET":
sorted_groups = sorted(
app.admin.get_mix_groups(), key=itemgetter("name")
)
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/roles", methods=["GET"])
@has_token
def ddapi_roles() -> OptionalJsonResponse:
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=itemgetter("name")):
log.error(role)
roles.append(
{
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
)
return json.dumps(roles), 200, {"Content-Type": "application/json"}
try:
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=itemgetter("name")):
roles.append(role_parser(role))
return json.dumps(roles), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/role/users", methods=["POST"])
@has_token
def ddapi_role_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
if data.get("id", data.get("name")):
role_users = [
user_parser(user)
for user in sorted_users
if data.get("id", data.get("name")) in user["roles"]
]
elif data.get("keycloak_id"):
try:
if request.method == "POST":
try:
id = [
r["id"]
for r in app.admin.get_roles()
if r["id"] == data.get("keycloak_id")
][0]
data = request.get_json(force=True)
assert isinstance(data, dict)
except Exception:
return ERR_400
sorted_users = sorted(
app.admin.get_mix_users(), key=itemgetter("username")
)
role: str = data.get("id", "")
if not role:
role = data.get("name", "")
if not role:
role = data.get("keycloak_id", "")
if role:
role_users = [
user_parser(user) for user in sorted_users if id in user["roles"]
user_parser(user)
for user in sorted_users
if role in user["roles"]
]
except:
raise Error("not_found", "Role keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
else:
return ERR_400
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
## INDIVIDUAL ACTIONS
# INDIVIDUAL ACTIONS
@app.json_route("/ddapi/user", methods=["POST"])
@app.json_route("/ddapi/user/<user_ddid>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_user(user_ddid : Optional[str]=None) -> OptionalJsonResponse:
uid : str = user_ddid if user_ddid else ''
if request.method == "GET":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
return json.dumps(user_parser(user)), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: "
+ str(app.validators["user"].errors),
traceback.format_exc(),
def ddapi_user(user_ddid: Optional[str] = None) -> OptionalJsonResponse:
try:
uid: str = user_ddid if user_ddid else ""
if request.method == "POST":
if uid:
return ERR_400
try:
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
log.debug(
"Data validation for user failed: "
+ str(app.validators["user"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
if app.admin.get_user_username(data["username"]):
return ERR_409
data = app.validators["user"].normalized(data)
try:
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
# Group does not exist already
return ERR_412
except Error as e:
if e.error.get("error") == "conflict":
# It already exists
return ERR_409
log.error(traceback.format_exc())
return ERR_500
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
if app.admin.get_user_username(data["username"]):
raise Error("conflict", "User id already exists")
data = app.validators["user"].normalized(data)
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
raise Error(
"precondition_required",
"Not all user groups already in system. Please create user groups before adding user.",
if request.method == "GET":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
return (
json.dumps(user_parser(user)),
200,
{"Content-Type": "application/json"},
)
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "PUT":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
data = request.get_json(force=True)
if not app.validators["user_update"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: "
+ str(app.validators["user_update"].errors),
traceback.format_exc(),
)
data = {**user, **data}
data = app.validators["user_update"].normalized(data)
data = {**data, **{"username": uid}}
data["roles"] = [data.pop("role")]
data["firstname"] = data.pop("first")
data["lastname"] = data.pop("last")
app.admin.user_update(data)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "PUT":
user = app.admin.get_user_username(uid)
if not user:
return ERR_404
try:
data = request.get_json(force=True)
if not data or not app.validators["user_update"].validate(data):
log.debug(
"Data validation for user failed: "
+ str(app.validators["user_update"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
data = app.validators["user_update"].normalized(data)
# Work with a secure copy
u = copy.deepcopy(user)
u.update({"username": uid})
# Update object from API-provided data
for p in ["email", "quota", "enabled", "groups"]:
if p in data:
u[p] = data[p]
for lp, rp in [("firstname", "first"), ("lastname", "last")]:
if rp in data:
u[lp] = data[rp]
# And role
if "role" in data:
u["roles"] = [data["role"]]
app.admin.user_update(u)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/username/<old_user_ddid>/<new_user_did>", methods=["PUT"])
@has_token
def ddapi_username(old_user_ddid : str, new_user_did : str) -> OptionalJsonResponse:
user = app.admin.get_user_username(old_user_ddid)
if not user:
raise Error("not_found", "User id not found")
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return json.dumps("Not implemented yet!"), 419, {"Content-Type": "application/json"}
def ddapi_username(old_user_ddid: str, new_user_did: str) -> OptionalJsonResponse:
try:
user = app.admin.get_user_username(old_user_ddid)
if not user:
return ERR_404
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return ERR_501
except Exception:
log.error(traceback.format_exc())
return ERR_500
@app.json_route("/ddapi/group", methods=["POST"])
@app.json_route("/ddapi/group/<group_id>", methods=["GET", "POST", "DELETE"])
# @app.json_route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_group(group_id : Optional[str]=None) -> OptionalJsonResponse:
uid : str = group_id if group_id else ''
if request.method == "GET":
group = app.admin.get_group_by_name(uid)
if not group:
Error("not found", "Group id not found")
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
raise Error(
"bad_request",
"Data validation for group failed: "
+ str(app.validators["group"].errors),
traceback.format_exc(),
def ddapi_group(group_id: Optional[str] = None) -> OptionalJsonResponse:
try:
uid: str = group_id if group_id else ""
# /ddapi/group
if request.method == "POST":
try:
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
log.debug(
"Data validation for group failed: "
+ str(app.validators["group"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(uid):
return ERR_409
app.admin.add_group(data)
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(uid):
raise Error("conflict", "Group id already exists")
path = app.admin.add_group(data)
# log.error(path)
# keycloak_id = app.admin.get_group_by_name(id)["id"]
# log.error()
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(uid)
if not group:
raise Error("not_found", "Group id not found")
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
# /ddapi/group/<group_id>
if request.method == "GET":
group = app.admin.get_group_by_name(uid)
if not group:
return ERR_404
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(uid)
if not group:
return ERR_404
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
@app.json_route("/ddapi/user_mail", methods=["POST"])
@app.json_route("/ddapi/user_mail/<id>", methods=["GET", "DELETE"])
@has_token
def ddapi_user_mail(id : Optional[str]=None) -> OptionalJsonResponse:
# TODO: Remove this endpoint when we ensure there are no consumers
if request.method == "GET":
return (
json.dumps("Not implemented yet"),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
def ddapi_user_mail(id: Optional[str] = None) -> OptionalJsonResponse:
try:
# TODO: Remove this endpoint when we ensure there are no consumers
if request.method in ["GET", "DELETE"]:
return ERR_501
if request.method == "POST":
try:
data = request.get_json(force=True)
assert isinstance(data, list) and data
for user in data:
if not app.validators["mail"].validate(user):
log.debug(
"Data validation for mail failed: "
+ str(app.validators["mail"].errors)
)
log.debug(traceback.format_exc())
return ERR_400
except Exception:
return ERR_400
# if not app.validators["mails"].validate(data):
# raise Error(
# "bad_request",
# "Data validation for mail failed: "
# + str(app.validators["mail"].errors),
# traceback.format_exc(),
# )
for user in data:
if not app.validators["mail"].validate(user):
raise Error(
"bad_request",
"Data validation for mail failed: "
+ str(app.validators["mail"].errors),
traceback.format_exc(),
)
for user in data:
log.info("Added user email")
app.admin.nextcloud_mail_set([user], dict())
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
for user in data:
log.info("Added user email")
app.admin.nextcloud_mail_set([user], dict())
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
except Exception:
log.error(traceback.format_exc())
return ERR_500
return None
def role_parser(role: Dict[str, str]) -> Dict[str, Any]:
return {
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
# TODO: After this line, this is all mostly duplicated from other places...
def user_parser(user : Dict[str, Any]) -> Dict[str, Any]:
def user_parser(user: Dict[str, Any]) -> Dict[str, Any]:
return {
"keycloak_id": user["id"],
"id": user["username"],
@ -346,7 +400,7 @@ def user_parser(user : Dict[str, Any]) -> Dict[str, Any]:
}
def group_parser(group : Dict[str, str]) -> Dict[str, Any]:
def group_parser(group: Dict[str, str]) -> Dict[str, Any]:
return {
"keycloak_id": group["id"],
"id": group["name"],
@ -356,7 +410,7 @@ def group_parser(group : Dict[str, str]) -> Dict[str, Any]:
}
def filter_users(users : Iterable[Dict[str, Any]], text : str) -> List[Dict[str, Any]]:
def filter_users(users: Iterable[Dict[str, Any]], text: str) -> List[Dict[str, Any]]:
return [
user
for user in users

View File

@ -0,0 +1,243 @@
"""
Mocks to test API views.
"""
import copy
from typing import Any, Dict, Iterable, List, Optional, cast
from admin.flaskapp import AdminFlaskApp
from admin.lib.admin import Admin
from admin.lib.keycloak_client import KeycloakClient
class MockKeycloakAdmin:
app: AdminFlaskApp
def __init__(self, app: AdminFlaskApp) -> None:
self.app = app
def _convert_kc(self, data: Dict[str, Any]) -> Dict[str, Any]:
u = {
"email": data["email"],
"id": data["id"],
"username": data["username"],
"first": data["firstName"],
"last": data["lastName"],
"enabled": data["enabled"],
"roles": [],
}
return u
def create_user(self, data: Dict[str, Any]) -> Any:
uid = data.get("username")
us = {u["id"]: u for u in self.app.admin.internal["users"]}
if not uid or uid in us:
raise Exception("Invalid or existing user")
u = copy.deepcopy(data)
u["id"] = uid
u = self._convert_kc(u)
self.app.admin.internal["users"].append(u)
return uid
def group_user_add(self, user_id: str, group_id: str) -> Any:
o = {u["id"]: u for u in self.app.admin.internal["users"]}.get(user_id, {})
o["groups"] = list(set(o.get("groups", []) + [group_id]))
return o
def delete_user(self, user_id: str) -> None:
self.app.admin.internal["users"] = [
u for u in self.app.admin.internal["users"] if u["id"] != user_id
]
def get_groups(self) -> List[Dict[str, Any]]:
return cast(List[Dict[str, Any]], self.app.admin.internal["groups"])
def get_realm_roles_of_user(self, user_id: str) -> List[Dict[str, Any]]:
o = {u["id"]: u for u in self.app.admin.internal["users"]}.get(user_id, {})
return [{"name": r} for r in o.get("roles", [])]
def get_group(self, group_id: str) -> Dict[str, Any]:
o = {g["id"]: g for g in self.app.admin.internal["groups"]}.get(group_id, {})
return cast(Dict[str, Any], o)
def create_group(self, group: Dict[str, Any], parent: Optional[str] = None) -> Any:
g = copy.deepcopy(group)
g.update(
{
"id": g["name"],
"path": "/".join(["", parent, g["name"]]) if parent else "",
}
)
self.app.admin.internal["groups"].append(g)
def delete_user_realm_role(self, user_id: str, roles: str) -> None:
o = {u["id"]: u for u in self.app.admin.internal["users"]}.get(user_id, {})
o["roles"] = [r for r in o.get("roles", []) if r not in roles]
o["keycloak_groups"] = o["roles"]
def update_user(self, user_id: str, payload: Dict[str, Any]) -> Any:
pass
class MockKeycloak(KeycloakClient):
app: AdminFlaskApp
def __init__(self, app: AdminFlaskApp) -> None:
self.app = app
self.keycloak_admin = MockKeycloakAdmin(app)
def get_group_by_path(self, path: str, recursive: bool = True) -> Any:
gs = {g["id"]: g for g in self.app.admin.internal["groups"]}
return gs.get(path.lstrip("/"))
def assign_realm_roles(self, uid: str, role: str) -> Dict[str, Any]:
o: Dict[str, Any] = {u["id"]: u for u in self.app.admin.internal["users"]}.get(
uid, {}
)
o["roles"] = list(set(o.get("roles", []) + [role]))
o["keycloak_groups"] = o["roles"]
return o
def connect(self) -> None:
pass
def user_update(
self,
user_id: str,
enabled: bool,
email: str,
first: str,
last: str,
groups: Iterable[str] = [],
roles: Iterable[str] = [],
) -> Dict[str, Any]:
o: Dict[str, Any] = {u["id"]: u for u in self.app.admin.internal["users"]}.get(
user_id, {}
)
o.update({"first": first, "last": last, "enabled": enabled})
o["groups"] = list(sorted(o["groups"]))
return o
def delete_group(self, group_id: str) -> None:
self.app.admin.internal["groups"] = [
g for g in self.app.admin.internal["groups"] if g["id"] != group_id
]
class MockMoodle:
app: AdminFlaskApp
def __init__(self, app: AdminFlaskApp):
self.app = app
def create_user(
self,
email: str,
username: str,
password: str,
first_name: str = "-",
last_name: str = "-",
) -> List[Dict[str, Any]]:
o = {u["id"]: u for u in self.app.admin.internal["users"]}.get(username, {})
o["moodle"] = True
o["moodle_id"] = username
return [{"id": username, "username": username}]
def get_cohorts(self) -> List[Dict[str, Any]]:
return cast(List[Dict[str, Any]], self.app.admin.internal["groups"])
def add_user_to_cohort(self, userid: str, cohortid: str) -> List[Dict[str, Any]]:
return [{"id": userid, "username": userid}]
def delete_users(self, user_id: str) -> None:
for u in user_id:
self.app.admin.delete_user(user_id)
def update_user(
self,
username: str,
email: str,
first_name: str,
last_name: str,
enabled: bool = True,
) -> None:
pass
def add_system_cohort(
self, name: str, description: str, visible: bool = True
) -> Dict[str, Any]:
return {"name": name}
def delete_cohorts(self, group_id: str) -> None:
pass
class MockNextcloud:
app: AdminFlaskApp
def __init__(self, app: AdminFlaskApp):
self.app = app
# nextcloud
def add_user_with_groups(
self,
userid: str,
userpassword: str,
quota: Any = False,
groups: Any = [],
email: str = "",
displayname: str = "",
) -> bool:
o = {u["id"]: u for u in self.app.admin.internal["users"]}.get(userid, {})
o["quota"] = quota
o["quota_used_bytes"] = 0
return True
def delete_user(self, userid: str) -> None:
pass
def update_user(self, user_id: str, user: Dict[str, Any]) -> Any:
pass
def add_user_to_group(self, user_id: str, group_id: str) -> bool:
return True
def add_group(self, groupid: str) -> bool:
return True
def delete_group(self, group_id: str) -> None:
pass
class MockAdmin(Admin):
"""
Mock class to easily test the API views.
"""
app: AdminFlaskApp
internal: Dict[str, Any]
def __init__(self, app: AdminFlaskApp):
"""
Constructor override so we can use the class for testing
"""
self.app = app
self.av = self # type: ignore
self.moodle = MockMoodle(app) # type: ignore
self.nextcloud = MockNextcloud(app) # type: ignore
self.third_party_cbs = []
# Initialise data
self.internal = {
"users": [],
"groups": [],
"roles": [],
}
def resync_data(self) -> bool:
return True
# avatars
def add_user_default_avatar(self, uid: str, role: str) -> None:
pass
def delete_user_avatar(self, uid: str) -> None:
pass

View File

@ -0,0 +1,555 @@
"""
"""
import copy
import os
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import flask_unittest
from flask.testing import FlaskClient
from jose import jws
if TYPE_CHECKING:
from werkzeug.test import TestResponse
from admin.flaskapp import AdminFlaskApp
from admin.lib.admin import MANAGER, STUDENT, TEACHER
from admin.views.ApiViews import group_parser, role_parser, user_parser
from admin.views.test.mocks import MockAdmin, MockKeycloak
def _testApp() -> AdminFlaskApp:
"""
Helper function to generate a testableFlask App
"""
app = AdminFlaskApp(
"TestApp", root_path="src/admin", template_folder="static/templates"
)
# Patch the admin
app.admin = MockAdmin(app)
# Patch keycloak client
app.admin.keycloak = MockKeycloak(app)
# Add socketio
from flask_socketio import SocketIO
app.socketio = SocketIO(app)
return app
class ApiViewsTests(flask_unittest.ClientTestCase):
#
# Instance fields
#
_app: Optional[AdminFlaskApp] = None
secret: str = "TestSecret"
tmpdir: TemporaryDirectory
#
# Instance properties
#
@property
def app(self) -> AdminFlaskApp:
if not self._app:
os.environ["DOMAIN"] = "dd-test.example"
os.environ["API_SECRET"] = self.secret
self._app = _testApp()
return self._app
@property
def auth_header(self) -> Dict[str, str]:
token = jws.sign({}, self.secret, algorithm="HS256")
return {"Authorization": f"bearer {token}"}
#
# Test setup
#
def setUp(self, client: FlaskClient) -> None:
self.tmpdir = TemporaryDirectory()
os.environ["NC_MAIL_QUEUE_FOLDER"] = os.path.join(
str(self.tmpdir.name), "nc_queue"
)
def tearDown(self, client: FlaskClient) -> None:
self.tmpdir.cleanup()
self._app = None
#
# Helper functions
#
def _ur(
self,
client: FlaskClient,
route: str,
method: str = "GET",
response_code: int = 401,
) -> "TestResponse":
"""
Helper function to ensure endpoints without authenticating
"""
rv = client.open(route, method=method)
self.assertStatus(rv, response_code)
return rv
def _r(
self,
client: FlaskClient,
route: str,
method: str = "GET",
response_code: int = 200,
*args: Any,
**kwargs: Any,
) -> "TestResponse":
"""
Helper function to test endpoint functionality with authentication
"""
rv = client.open(
route, method=method, headers=self.auth_header, *args, **kwargs
)
self.assertStatus(rv, response_code)
return rv
_bad_json: List[Any] = [None, [], "test", 1, 0, True, False, {}]
_initialUsers = [
{
"username": "u1",
"id": "u1",
"keycloak": True,
"keycloak_id": "u1",
"enabled": True,
"email": "u1@u.com",
"first": "u",
"firstname": "u",
"last": "1",
"lastname": "1",
"roles": [MANAGER],
"keycloak_groups": ["group1"],
"groups": ["group1", MANAGER],
"moodle": True,
"moodle_id": "u1",
"moodle_groups": [],
"nextcloud": True,
"nextcloud_id": "u1",
"nextcloud_groups": [],
"quota": False,
"quota_used_bytes": False,
},
]
_initialGroups = [
{"keycloak": True, "id": r, "name": r, "path": r}
for r in ["group1", MANAGER, STUDENT, TEACHER]
]
_initialRoles = [
{"id": r, "name": f"NAME {r}", "description": f"DESC {r}"}
for r in [MANAGER, STUDENT, TEACHER]
]
def _feedInitialData(self) -> None:
self.app.admin.internal["users"] = copy.deepcopy(self._initialUsers)
self.app.admin.internal["groups"] = copy.deepcopy(self._initialGroups)
self.app.admin.internal["roles"] = copy.deepcopy(self._initialRoles)
def _canonicUserData(self, users: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [user_parser(u) for u in users]
def _canonicGroupData(self, groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [group_parser(g) for g in groups]
def _canonicRoleData(self, roles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [role_parser(r) for r in roles]
#
# Actual tests
#
# /ddapi/users
def test_users_401(self, client: FlaskClient) -> None:
for m in ["GET"]:
self._ur(client, "/ddapi/users", method=m)
def test_users_405(self, client: FlaskClient) -> None:
for m in ["PUT", "DELETE", "POST"]:
self._ur(client, "/ddapi/users", method=m, response_code=405)
self._r(client, "/ddapi/users", method=m, response_code=405)
def test_users_GET(self, client: FlaskClient) -> None:
"""Ensure GETting users makes sense"""
rv = self._r(client, "/ddapi/users")
# When empty, it should return an empty list
self.assertEqual([], rv.json, msg="Expected empty list")
# When populated, it should return the test data
self._feedInitialData()
rv = self._r(client, "/ddapi/users")
self.assertEqual(
self._canonicUserData(self._initialUsers),
rv.json,
msg="Unexpected data received",
)
# /ddapi/users/filter
# Searches in username, first, last, email
def test_users_filter_401(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/users/filter", method=m)
def test_users_filter_405(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "PUT"]:
self._ur(client, "/ddapi/users/filter", method=m, response_code=405)
self._r(client, "/ddapi/users/filter", method=m, response_code=405)
def test_users_filter_POST(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/users/filter", method="POST", response_code=400)
for bad_j in self._bad_json:
rv = self._r(
client,
"/ddapi/users/filter",
method="POST",
response_code=400,
json=bad_j,
)
rv = self._r(client, "/ddapi/users/filter", method="POST", json={"text": "u1"})
self.assertEqual([], rv.json, "Unexpected data received")
self._feedInitialData()
rv = self._r(client, "/ddapi/users/filter", method="POST", json={"text": "u1"})
self.assertEqual(
self._canonicUserData(self._initialUsers),
rv.json,
"Unexpected data received",
)
rv = self._r(
client,
"/ddapi/users/filter",
method="POST",
json={"text": "SEARCHING_FOR_NO_MATCH"},
)
self.assertEqual([], rv.json, "Unexpected data received")
# /ddapi/groups
def test_groups_401(self, client: FlaskClient) -> None:
for m in ["GET"]:
self._ur(client, "/ddapi/groups", method=m)
def test_groups_405(self, client: FlaskClient) -> None:
for m in ["POST", "PUT", "DELETE"]:
self._ur(client, "/ddapi/groups", method=m, response_code=405)
self._r(client, "/ddapi/groups", method=m, response_code=405)
def test_groups_GET(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/groups")
self.assertEqual([], rv.json, "Unexpected data received")
self._feedInitialData()
rv = self._r(client, "/ddapi/groups")
self.assertEqual(
self._canonicGroupData(self._initialGroups),
rv.json,
"Unexpected data received",
)
# /ddapi/roles
def test_roles_401(self, client: FlaskClient) -> None:
for m in ["GET"]:
self._ur(client, "/ddapi/roles", method=m)
def test_roles_405(self, client: FlaskClient) -> None:
for m in ["POST", "DELETE", "PUT"]:
self._ur(client, "/ddapi/roles", method=m, response_code=405)
self._r(client, "/ddapi/roles", method=m, response_code=405)
def test_roles_GET(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/roles")
self.assertEqual([], rv.json, "Unexpected data received")
self._feedInitialData()
rv = self._r(client, "/ddapi/roles")
self.assertEqual(
self._canonicRoleData(self._initialRoles),
rv.json,
"Unexpected data received",
)
# /ddapi/role/users
# - id|name|keycloak_id <-- used in this order, equivalent
def test_role_users_405(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "PUT"]:
self._ur(client, "/ddapi/role/users", method=m, response_code=405)
self._r(client, "/ddapi/role/users", method=m, response_code=405)
def test_role_users_401(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/role/users", method=m)
def test_role_users_POST(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/role/users", method="POST", response_code=400)
for bad_j in self._bad_json:
rv = self._r(
client,
"/ddapi/role/users",
method="POST",
response_code=400,
json=bad_j,
)
rv = self._r(client, "/ddapi/role/users", method="POST", json={"id": MANAGER})
self.assertEqual([], rv.json, "Unexpected data received")
self._feedInitialData()
rv = self._r(client, "/ddapi/role/users", method="POST", json={"id": MANAGER})
self.assertEqual(
self._canonicUserData(self._initialUsers),
rv.json,
"Unexpected data received",
)
rv = self._r(client, "/ddapi/role/users", method="POST", json={"id": TEACHER})
self.assertEqual([], rv.json, "Unexpected data received")
# /ddapi/user[/<user_ddid>]
def test_user_401(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/user", method=m)
def test_user_405(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "PUT"]:
self._ur(client, "/ddapi/user", method=m, response_code=405)
self._r(client, "/ddapi/user", method=m, response_code=405)
def test_user_POST(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user", method="POST", response_code=400)
for bad_j in self._bad_json:
self._r(client, "/ddapi/user", method="POST", response_code=400, json=bad_j)
new_user = {
"email": "a",
"enabled": True,
"password": "PASS",
"quota": "",
"username": "cu1",
"first": "FIRSTNAME",
"last": "LASTNAME",
"role": MANAGER,
"groups": ["group2"],
}
self._r(client, "/ddapi/user", method="POST", response_code=400, json=new_user)
self.assertNotIn(
"cu1", [u["id"] for u in self.app.admin.internal["users"]], "Created user"
)
new_user["email"] = "a@a.example"
self._r(client, "/ddapi/user", method="POST", response_code=412, json=new_user)
self.assertNotIn(
"cu1", [u["id"] for u in self.app.admin.internal["users"]], "Created user"
)
self._feedInitialData()
self._r(client, "/ddapi/user", method="POST", response_code=412, json=new_user)
self.assertNotIn(
"cu1", [u["id"] for u in self.app.admin.internal["users"]], "Created user"
)
new_user["groups"] = ["group1"]
self._r(client, "/ddapi/user", method="POST", response_code=200, json=new_user)
self.assertIn(
"cu1",
[u["id"] for u in self.app.admin.internal["users"]],
"Did not create user",
)
def test_user_ddid_GET_401(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "PUT"]:
self._ur(client, "/ddapi/user/u1", method=m)
def test_user_ddid_GET_405(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/user/u1", method=m, response_code=405)
self._r(client, "/ddapi/user/u1", method=m, response_code=405)
def test_user_ddid_GET(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user/", response_code=405)
self._r(client, "/ddapi/user/u1", response_code=404)
self._feedInitialData()
rv = self._r(client, "/ddapi/user/u1", response_code=200)
self.assertEqual("u1", rv.json["username"])
def test_user_ddid_DELETE(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user/u1", method="DELETE", response_code=404)
self._feedInitialData()
rv = self._r(client, "/ddapi/users")
self.assertNotEqual(rv.json, [])
self._r(client, "/ddapi/user/u1", method="DELETE")
rv = self._r(client, "/ddapi/users")
self.assertEqual(rv.json, [])
rv = self._r(client, "/ddapi/user/u1", response_code=404)
def test_user_ddid_PUT(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user/u1", method="PUT", response_code=404)
self._feedInitialData()
rv = self._r(client, "/ddapi/user/u1")
prev_u = rv.json
self.assertEqual(prev_u["first"], "u")
self._r(client, "/ddapi/user/u1", method="PUT", response_code=400)
for bad_j in self._bad_json:
self._r(
client, "/ddapi/user/u1", method="PUT", response_code=400, json=bad_j
)
rv = self._r(client, "/ddapi/user/u1")
self.assertEqual(prev_u, rv.json)
self._r(
client,
"/ddapi/user/u1",
method="PUT",
response_code=200,
json={"first": "U"},
)
rv = self._r(client, "/ddapi/user/u1")
new_u = rv.json
self.assertNotEqual(prev_u, rv.json)
self.assertEqual(rv.json["first"], "U")
prev_u = new_u
self._r(
client,
"/ddapi/user/u1",
method="PUT",
response_code=200,
json={"first": "U", "password": "TEST2"},
)
# /ddapi/username/OLD/NEW
def test_username_401(self, client: FlaskClient) -> None:
for m in ["PUT"]:
self._ur(client, "/ddapi/username/a/b", method=m)
def test_username_405(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "POST"]:
self._ur(client, "/ddapi/username/a/b", method=m, response_code=405)
self._r(client, "/ddapi/username/a/b", method=m, response_code=405)
def test_username_PUT(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/users")
self.assertEqual([], rv.json)
self._r(client, "/ddapi/username/u1/u2", method="PUT", response_code=404)
self._feedInitialData()
rv = self._r(client, "/ddapi/users")
self.assertIn("u1", [u["id"] for u in rv.json])
self._r(client, "/ddapi/username/u1/u2", method="PUT", response_code=501)
# /ddapi/group
# /ddapi/group/<group_id>
def test_group_401(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/group", method=m)
def test_group_405(self, client: FlaskClient) -> None:
for m in ["GET", "PUT", "DELETE"]:
self._ur(client, "/ddapi/group", method=m, response_code=405)
self._r(client, "/ddapi/group", method=m, response_code=405)
def test_group_POST(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/group", method="POST", response_code=400)
for bad_j in self._bad_json:
self._r(
client, "/ddapi/group", method="POST", response_code=400, json=bad_j
)
rv = self._r(client, "/ddapi/groups")
self.assertEqual([], rv.json)
self._r(client, "/ddapi/group", method="POST", json={"name": "group2"})
rv = self._r(client, "/ddapi/groups")
self.assertIn("group2", [g["name"] for g in rv.json])
# TODO: Check what this is supposed to do
# self._r(client, "/ddapi/group", method='POST', response_code=409,
# json={"name": "group2"})
def test_group_gid_401(self, client: FlaskClient) -> None:
for m in ["GET", "POST", "DELETE"]:
self._ur(client, "/ddapi/group/group2", method=m)
def test_group_gid_405(self, client: FlaskClient) -> None:
for m in ["PUT"]:
self._ur(client, "/ddapi/group/group2", method=m, response_code=405)
self._r(client, "/ddapi/group/group2", method=m, response_code=405)
def test_group_gid_GET(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/group/group2", response_code=404)
self._r(client, "/ddapi/group", method="POST", json={"name": "group2"})
rv = self._r(client, "/ddapi/group/group2")
self.assertEqual("group2", rv.json["name"])
def test_group_gid_POST(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/group/group2", method="POST", response_code=400)
for bad_j in self._bad_json:
self._r(
client,
"/ddapi/group/group2",
method="POST",
response_code=400,
json=bad_j,
)
self._r(client, "/ddapi/group/group2", response_code=404)
self._r(client, "/ddapi/group/group2", method="POST", json={"name": "group2"})
rv = self._r(client, "/ddapi/groups")
self.assertIn("group2", [g["name"] for g in rv.json])
rv = self._r(client, "/ddapi/group/group2")
self.assertEqual("group2", rv.json["name"])
self._r(
client,
"/ddapi/group/group2",
method="POST",
response_code=409,
json={"name": "group2"},
)
def test_group_gid_DELETE(self, client: FlaskClient) -> None:
rv = self._r(client, "/ddapi/groups")
self.assertEqual([], rv.json)
self._r(client, "/ddapi/group/group2", method="DELETE", response_code=404)
self._r(client, "/ddapi/group/group2", method="POST", json={"name": "group2"})
rv = self._r(client, "/ddapi/groups")
self.assertIn("group2", [g["name"] for g in rv.json])
self._r(client, "/ddapi/group/group2", method="DELETE", response_code=200)
rv = self._r(client, "/ddapi/groups")
self.assertEqual([], rv.json)
# /ddapi/user_mail
# /ddapi/user_mail/<id>
def test_user_mail_401(self, client: FlaskClient) -> None:
for m in ["POST"]:
self._ur(client, "/ddapi/user_mail", method=m)
def test_user_mail_405(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE", "PUT"]:
self._ur(client, "/ddapi/user_mail", method=m, response_code=405)
self._r(client, "/ddapi/user_mail", method=m, response_code=405)
def test_user_mail_POST(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user_mail", method="POST", response_code=400)
for bad_j in self._bad_json:
self._r(
client, "/ddapi/user_mail", method="POST", response_code=400, json=bad_j
)
us = [{"user_id": "u1", "email": "a"}]
self._r(client, "/ddapi/user_mail", method="POST", response_code=400, json=us)
def test_user_mail_id_401(self, client: FlaskClient) -> None:
for m in ["GET", "DELETE"]:
self._ur(client, "/ddapi/user_mail/id", method=m)
def test_user_mail_id_405(self, client: FlaskClient) -> None:
for m in ["POST", "PUT"]:
self._ur(client, "/ddapi/user_mail/id", method=m, response_code=405)
self._r(client, "/ddapi/user_mail/id", method=m, response_code=405)
def test_user_mail_id_GET(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user_mail/u1", response_code=501)
def test_user_mail_id_DELETE(self, client: FlaskClient) -> None:
self._r(client, "/ddapi/user_mail/u1", method="DELETE", response_code=501)
# /ddapi/group/users
# --> deleted code, was never used
# def test_group_users_GET_unauth(self, client: FlaskClient) -> None:
# return self._ur(
# client, "/ddapi/group/users"
# )
# def test_group_users_GET(self, client: FlaskClient) -> None:
# rv = self._r(client, "/ddapi/group/users", response_code=405)
# def test_get_role_users(self, client: FlaskClient) -> None:
# rv = self._r(client, "/ddapi/role/users", method="POST")
# print(rv)
# print(rv.json)