[sso-admin] Disentangle module and add type hints

With this commit, code from the admin module can be re-used and thanks
to adding type-hints in most places we are able to discover some bugs.

This commit attempts to fix only that which was necessary to:

- Add a reasonable amount of type hints
- Disentangle the module

There are already some issues that have been discovered by mypy.
Xnet-DigitalDemocratic-main-patch-41273
Evilham 2022-07-29 14:02:49 +02:00
parent e98323913d
commit 81fff214d5
No known key found for this signature in database
GPG Key ID: AE3EE30D970886BF
27 changed files with 1773 additions and 1664 deletions

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -20,107 +21,23 @@
import logging as log
import os
import os.path
from flask import Flask, render_template, send_from_directory
from admin.flaskapp import AdminFlaskApp
app = Flask(__name__, static_url_path="")
app = Flask(__name__, template_folder="static/templates")
app.url_map.strict_slashes = False
def get_app() -> AdminFlaskApp:
app = AdminFlaskApp(__name__, template_folder="static/templates")
"""
App secret key for encrypting cookies
You can generate one with:
import os
os.urandom(24)
And paste it here.
"""
app.secret_key = "Change this key!/\xf7\x83\xbe\x17\xfa\xa3zT\n\\]m\xa6\x8bF\xdd\r\xf7\x9e\x1d\x1f\x14'"
"""
Debug should be removed on production!
"""
if app.debug:
log.warning("Debug mode: {}".format(app.debug))
else:
log.info("Debug mode: {}".format(app.debug))
print("Starting dd-sso api...")
return app
from admin.lib.load_config import loadConfig
try:
loadConfig(app)
except:
print("Could not get environment variables...")
from admin.lib.postup import Postup
Postup()
from admin.lib.admin import Admin
app.admin = Admin()
app.ready = False
"""
Debug should be removed on production!
"""
if app.debug:
log.warning("Debug mode: {}".format(app.debug))
else:
log.info("Debug mode: {}".format(app.debug))
"""
Serve static files
"""
@app.route("/build/<path:path>")
def send_build(path):
return send_from_directory(
os.path.join(app.root_path, "node_modules/gentelella/build"), path
)
@app.route("/vendors/<path:path>")
def send_vendors(path):
return send_from_directory(
os.path.join(app.root_path, "node_modules/gentelella/vendors"), path
)
@app.route("/node_modules/<path:path>")
def send_nodes(path):
return send_from_directory(os.path.join(app.root_path, "node_modules"), path)
@app.route("/templates/<path:path>")
def send_templates(path):
return send_from_directory(os.path.join(app.root_path, "templates"), path)
# @app.route('/templates/<path:path>')
# def send_templates(path):
# return send_from_directory(os.path.join(app.root_path, 'static/templates'), path)
@app.route("/static/<path:path>")
def send_static_js(path):
return send_from_directory(os.path.join(app.root_path, "static"), path)
@app.route("/avatars/<path:path>")
def send_avatars_img(path):
return send_from_directory(
os.path.join(app.root_path, "../avatars/master-avatars"), path
)
@app.route("/custom/<path:path>")
def send_custom(path):
return send_from_directory(os.path.join(app.root_path, "../custom"), path)
# @app.errorhandler(404)
# def not_found_error(error):
# return render_template('page_404.html'), 404
# @app.errorhandler(500)
# def internal_error(error):
# return render_template('page_500.html'), 500
"""
Import all views

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -21,13 +22,9 @@ import os
from flask_login import LoginManager, UserMixin
from admin import app
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
from typing import TYPE_CHECKING, Dict
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
ram_users = {
os.environ["ADMINAPP_USER"]: {
@ -49,13 +46,19 @@ ram_users = {
class User(UserMixin):
def __init__(self, dict):
self.id = dict["id"]
self.username = dict["id"]
self.password = dict["password"]
self.role = dict["role"]
def __init__(self, id : str, password : str, role : str, active : bool = True) -> None:
self.id = id
self.username = id
self.password = password
self.role = role
self.active = active
def setup_auth(app : "AdminFlaskApp") -> None:
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
@login_manager.user_loader
def user_loader(username):
return User(ram_users[username])
@login_manager.user_loader
def user_loader(username : str) -> User:
u = ram_users[username]
return User(id = u["id"], password = u["password"], role = u["role"])

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -30,17 +31,18 @@ from functools import wraps
from flask import request
from jose import jwt
import jose.exceptions
from admin import app
from typing import Any
from ..lib.api_exceptions import Error
from admin.lib.api_exceptions import Error
def get_header_jwt_payload():
def get_header_jwt_payload() -> Any:
return get_token_payload(get_token_auth_header())
def get_token_header(header):
def get_token_header(header : str) -> str:
"""Obtains the Access Token from the a Header"""
auth = request.headers.get(header, None)
if not auth:
@ -70,15 +72,15 @@ def get_token_header(header):
return parts[1] # Token
def get_token_auth_header():
def get_token_auth_header() -> str:
return get_token_header("Authorization")
def get_token_payload(token):
def get_token_payload(token : str) -> Any:
# log.warning("The received token in get_token_payload is: " + str(token))
try:
claims = jwt.get_unverified_claims(token)
secret = app.config["API_SECRET"]
secret = os.environ["API_SECRET"]
except:
log.warning(
@ -97,11 +99,11 @@ def get_token_payload(token):
algorithms=["HS256"],
options=dict(verify_aud=False, verify_sub=False, verify_exp=True),
)
except jwt.ExpiredSignatureError:
except jose.exceptions.ExpiredSignatureError:
log.warning("Token expired")
raise Error("unauthorized", "Token is expired", traceback.format_stack())
except jwt.JWTClaimsError:
except jose.exceptions.JWTClaimsError:
raise Error(
"unauthorized",
"Incorrect claims, please check the audience and issuer",

View File

@ -0,0 +1,225 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
# DD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# DD is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import logging as log
import os
import os.path
import secrets
import traceback
from typing import TYPE_CHECKING, Any, Callable, Dict
import yaml
from cerberus import Validator
from flask import Flask, Response, jsonify, render_template, send_from_directory
from admin.lib.api_exceptions import Error
from admin.views.decorators import OptionalJsonResponse
from admin.views.ApiViews import setup_api_views
from admin.views.AppViews import setup_app_views
from admin.views.LoginViews import setup_login_views
from admin.views.WebViews import setup_web_views
from admin.views.WpViews import setup_wp_views
from admin.auth.authentication import setup_auth
if TYPE_CHECKING:
from admin.lib.admin import Admin
from admin.lib.postup import Postup
class AdminValidator(Validator): # type: ignore # cerberus type hints MIA
# TODO: What's the point of this class?
None
# def _normalize_default_setter_genid(self, document):
# return _parse_string(document["name"])
# def _normalize_default_setter_genidlower(self, document):
# return _parse_string(document["name"]).lower()
# def _normalize_default_setter_gengroupid(self, document):
# return _parse_string(
# document["parent_category"] + "-" + document["uid"]
# ).lower()
class AdminFlaskApp(Flask):
"""
Subclass Flask app to ease customisation and type checking.
In order for an instance of this class to be useful,
the setup method should be called after instantiating.
"""
admin: "Admin"
secrets_dir: str
ready: bool = False
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.url_map.strict_slashes = False
from admin.lib.admin import Admin
self.admin = Admin(self)
# Minor setup tasks
self._load_validators()
self._load_config()
self._setup_routes()
setup_api_views(self)
setup_app_views(self)
setup_login_views(self)
setup_web_views(self)
setup_wp_views(self)
setup_auth(self)
@property
def legal_path(self) -> str:
return os.path.join(self.root_path, "static/templates/pages/legal/")
@property
def avatars_path(self) -> str:
return os.path.join(self.root_path, "../custom/avatars/")
def setup(self) -> None:
"""
Perform setup tasks that might do network
"""
from admin.lib.postup import Postup
Postup(self)
def json_route(self, rule: str, **options: Any) -> Callable[..., OptionalJsonResponse]:
return self.route(rule, **options) # type: ignore # mypy issue #7187
def _load_validators(self, purge_unknown: bool = True) -> Dict[str, Validator]:
validators = {}
schema_path = os.path.join(self.root_path, "schemas")
for schema_filename in os.listdir(schema_path):
try:
with open(os.path.join(schema_path, schema_filename)) as file:
schema_yml = file.read()
schema = yaml.load(schema_yml, Loader=yaml.FullLoader)
validators[schema_filename.split(".")[0]] = AdminValidator(
schema, purge_unknown=purge_unknown
)
except IsADirectoryError:
None
return validators
def _load_config(self) -> None:
try:
# Handle secrets like Flask's session key
self.secrets_dir = os.environ.get("SECRETS", "secret")
secret_key_file = os.path.join(self.secrets_dir, "secret_key")
if not os.path.exists(self.secrets_dir):
os.mkdir(self.secrets_dir)
if not os.path.exists(secret_key_file):
# Generate as needed
# https://flask.palletsprojects.com/en/2.1.x/config/#SECRET_KEY
with os.fdopen(
os.open(secret_key_file, os.O_WRONLY | os.O_CREAT, 0o600), "w"
) as f:
f.write(secrets.token_hex())
self.secret_key = open(secret_key_file, "r").read()
# Move on with ISARD's settings
self.config.setdefault("DOMAIN", os.environ["DOMAIN"])
self.config.setdefault(
"KEYCLOAK_POSTGRES_USER", os.environ["KEYCLOAK_DB_USER"]
)
self.config.setdefault(
"KEYCLOAK_POSTGRES_PASSWORD", os.environ["KEYCLOAK_DB_PASSWORD"]
)
self.config.setdefault(
"MOODLE_POSTGRES_USER", os.environ["MOODLE_POSTGRES_USER"]
)
self.config.setdefault(
"MOODLE_POSTGRES_PASSWORD", os.environ["MOODLE_POSTGRES_PASSWORD"]
)
self.config.setdefault(
"NEXTCLOUD_POSTGRES_USER", os.environ["NEXTCLOUD_POSTGRES_USER"]
)
self.config.setdefault(
"NEXTCLOUD_POSTGRES_PASSWORD", os.environ["NEXTCLOUD_POSTGRES_PASSWORD"]
)
self.config.setdefault(
"VERIFY", True if os.environ["VERIFY"] == "true" else False
)
self.config.setdefault("API_SECRET", os.environ.get("API_SECRET"))
except Exception as e:
log.error(traceback.format_exc())
raise
def _setup_routes(self) -> None:
"""
Setup routes to Serve static files
"""
@self.route("/build/<path:path>")
def send_build(path: str) -> Response:
return send_from_directory(
os.path.join(self.root_path, "node_modules/gentelella/build"), path
)
@self.route("/vendors/<path:path>")
def send_vendors(path: str) -> Response:
return send_from_directory(
os.path.join(self.root_path, "node_modules/gentelella/vendors"), path
)
@self.route("/node_modules/<path:path>")
def send_nodes(path: str) -> Response:
return send_from_directory(
os.path.join(self.root_path, "node_modules"), path
)
@self.route("/templates/<path:path>")
def send_templates(path: str) -> Response:
return send_from_directory(os.path.join(self.root_path, "templates"), path)
# @self.route('/templates/<path:path>')
# def send_templates(path):
# return send_from_directory(os.path.join(self.root_path, 'static/templates'), path)
@self.route("/static/<path:path>")
def send_static_js(path: str) -> Response:
return send_from_directory(os.path.join(self.root_path, "static"), path)
@self.route("/avatars/<path:path>")
def send_avatars_img(path: str) -> Response:
return send_from_directory(
os.path.join(self.root_path, "../avatars/master-avatars"), path
)
@self.route("/custom/<path:path>")
def send_custom(path: str) -> Response:
return send_from_directory(os.path.join(self.root_path, "../custom"), path)
# @self.errorhandler(404)
# def not_found_error(error):
# return render_template('page_404.html'), 404
# @self.errorhandler(500)
# def internal_error(error):
# return render_template('page_500.html'), 500
@self.errorhandler(Error)
def handle_user_error(ex : Error) -> Response:
response : Response = jsonify(ex.error)
response.status_code = ex.status_code
response.headers.extend(ex.content_type) # type: ignore # werkzeug type hint MIA
return response

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -26,8 +27,6 @@ from time import sleep
import diceware
from admin import app
from .avatars import Avatars
from .helpers import (
filter_roles_list,
@ -61,14 +60,27 @@ from .helpers import (
rand_password,
)
from typing import TYPE_CHECKING, cast, Any, Dict, Iterable, List, Optional
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
MANAGER = os.environ["CUSTOM_ROLE_MANAGER"]
TEACHER = os.environ["CUSTOM_ROLE_TEACHER"]
STUDENT = os.environ["CUSTOM_ROLE_STUDENT"]
DDUser = Dict[str, Any]
DDGroup = Dict[str, Any]
DDRole = Dict[str, Any]
class Admin:
def __init__(self):
self.check_connections()
app : "AdminFlaskApp"
internal : Dict[str, Any]
external : Dict[str, Any]
def __init__(self, app : "AdminFlaskApp") -> None:
self.app = app
self.check_connections(app)
self.set_custom_roles()
self.overwrite_admins()
@ -90,13 +102,13 @@ class Admin:
self.external = {"users": [], "groups": [], "roles": []}
log.warning(" Updating missing user avatars with defaults")
self.av = Avatars()
self.av = Avatars(app.avatars_path)
# av.minio_delete_all_objects() # This will reset all avatars on usres
self.av.update_missing_avatars(self.internal["users"])
log.warning(" SYSTEM READY TO HANDLE CONNECTIONS")
def check_connections(self):
def check_connections(self, app : "AdminFlaskApp") -> None:
ready = False
while not ready:
try:
@ -111,7 +123,7 @@ class Admin:
ready = False
while not ready:
try:
self.moodle = Moodle(verify=app.config["VERIFY"])
self.moodle = Moodle(app)
ready = True
except:
log.error("Could not connect to moodle, waiting to be online...")
@ -136,18 +148,18 @@ class Admin:
ready = False
while not ready:
try:
self.nextcloud = Nextcloud(verify=app.config["VERIFY"])
self.nextcloud = Nextcloud(verify=app.config["VERIFY"], app=app)
ready = True
except:
log.error("Could not connect to nextcloud, waiting to be online...")
sleep(2)
log.warning("Nextcloud connected.")
def set_custom_roles(self):
def set_custom_roles(self) -> None:
pass
## This function should be moved to postup.py
def overwrite_admins(self):
def overwrite_admins(self) -> None:
log.warning("Setting defaults...")
dduser = os.environ["DDADMIN_USER"]
ddpassword = os.environ["DDADMIN_PASSWORD"]
@ -223,7 +235,7 @@ class Admin:
log.error(traceback.format_exc())
exit(1)
def default_setup(self):
def default_setup(self) -> None:
### Add default roles
try:
log.warning("KEYCLOAK: Adding default roles")
@ -324,7 +336,7 @@ class Admin:
# except:
# log.warning("KEYCLOAK: Seems to be there already")
def resync_data(self):
def resync_data(self) -> bool:
self.internal = {
"users": self._get_mix_users(),
"groups": self._get_mix_groups(),
@ -332,7 +344,7 @@ class Admin:
}
return True
def get_moodle_users(self):
def get_moodle_users(self) -> List[Any]:
return [
u
for u in self.moodle.get_users_with_groups_and_roles()
@ -353,7 +365,7 @@ class Admin:
# "roles": u['roles']}
# for u in users]
def get_keycloak_users(self):
def get_keycloak_users(self) -> List[DDUser]:
# log.warning('Loading keycloak users... can take a long time...')
users = self.keycloak.get_users_with_groups_and_roles()
@ -372,7 +384,7 @@ class Admin:
if not system_username(u["username"])
]
def get_nextcloud_users(self):
def get_nextcloud_users(self) -> List[DDUser]:
return [
{
"id": u["username"],
@ -414,11 +426,11 @@ class Admin:
# "roles": []})
# return users_list
def get_mix_users(self):
sio_event_send("get_users", {"you_win": "you got the users!"})
def get_mix_users(self) -> Any:
sio_event_send(self.app, "get_users", {"you_win": "you got the users!"})
return self.internal["users"]
def _get_mix_users(self):
def _get_mix_users(self) -> List[DDUser]:
kgroups = self.keycloak.get_groups()
kusers = self.get_keycloak_users()
@ -481,32 +493,32 @@ class Admin:
users.append(theuser)
return users
def get_roles(self):
def get_roles(self) -> Any:
return self.internal["roles"]
def _get_roles(self):
def _get_roles(self) -> List[DDRole]:
return filter_roles_listofdicts(self.keycloak.get_roles())
def get_group_by_name(self, group_name):
def get_group_by_name(self, group_name : str) -> Any:
group = [g for g in self.internal["groups"] if g["name"] == group_name]
return group[0] if len(group) else False
def get_keycloak_groups(self):
def get_keycloak_groups(self) -> Any:
log.warning("Loading keycloak groups...")
return self.keycloak.get_groups()
def get_moodle_groups(self):
def get_moodle_groups(self) -> Any:
log.warning("Loading moodle groups...")
return self.moodle.get_cohorts()
def get_nextcloud_groups(self):
def get_nextcloud_groups(self) -> Any:
log.warning("Loading nextcloud groups...")
return self.nextcloud.get_groups_list()
def get_mix_groups(self):
def get_mix_groups(self) -> Any:
return self.internal["groups"]
def _get_mix_groups(self):
def _get_mix_groups(self) -> List[Dict[str, Any]]:
kgroups = self.get_keycloak_groups()
mgroups = self.get_moodle_groups()
ngroups = self.get_nextcloud_groups()
@ -564,7 +576,7 @@ class Admin:
groups.append(thegroup)
return groups
def sync_groups_from_keycloak(self):
def sync_groups_from_keycloak(self) -> None:
self.resync_data()
for group in self.internal["groups"]:
if not group["keycloak"]:
@ -586,22 +598,22 @@ class Admin:
self.nextcloud.add_group(group["name"])
self.resync_data()
def get_external_users(self):
def get_external_users(self) -> Any:
return self.external["users"]
def get_external_groups(self):
def get_external_groups(self) -> Any:
return self.external["groups"]
def get_external_roles(self):
def get_external_roles(self) -> Any:
return self.external["roles"]
def upload_csv_ug(self, data):
def upload_csv_ug(self, data : Dict[str, Any]) -> bool:
log.warning("Processing uploaded users...")
users = []
total = len(data["data"])
item = 1
ev = Events("Processing uploaded users", total=len(data["data"]))
groups = []
ev = Events(self.app, "Processing uploaded users", total=len(data["data"]))
groups : List[str] = []
for u in data["data"]:
log.warning(
"Processing ("
@ -680,18 +692,18 @@ class Admin:
self.external["groups"] = sysgroups
return True
def get_dice_pwd(self):
return diceware.get_passphrase(options=options)
def get_dice_pwd(self) -> str:
return cast(str, diceware.get_passphrase(options=options))
def reset_external(self):
def reset_external(self) -> bool:
self.external = {"users": [], "groups": [], "roles": []}
return True
def upload_json_ga(self, data):
def upload_json_ga(self, data : Dict[str, Any]) -> bool:
groups = []
log.warning("Processing uploaded groups...")
try:
ev = Events(
ev = Events(self.app,
"Processing uploaded groups",
"Group:",
total=len(data["data"]["groups"]),
@ -718,7 +730,7 @@ class Admin:
users = []
total = len(data["data"]["users"])
item = 1
ev = Events(
ev = Events(self.app,
"Processing uploaded users",
"User:",
total=len(data["data"]["users"]),
@ -757,7 +769,7 @@ class Admin:
u["groups"] = u["groups"] + [g["name"]]
return True
def sync_external(self, ids):
def sync_external(self, ids : Any) -> None:
# self.resync_data()
log.warning("Starting sync to keycloak")
self.sync_to_keycloak_external()
@ -769,10 +781,10 @@ class Admin:
log.warning("All syncs finished. Resyncing from apps...")
self.resync_data()
def add_keycloak_groups(self, groups):
def add_keycloak_groups(self, groups : List[Any]) -> None:
total = len(groups)
i = 0
ev = Events(
ev = Events(self.app,
"Syncing import groups to keycloak", "Adding group:", total=len(groups)
)
for g in groups:
@ -790,8 +802,8 @@ class Admin:
def sync_to_keycloak_external(
self,
): ### This one works from the external, moodle and nextcloud from the internal
groups = []
) -> None: ### This one works from the external, moodle and nextcloud from the internal
groups : List[DDGroup] = []
for u in self.external["users"]:
groups = groups + u["groups"]
groups = list(dict.fromkeys(groups))
@ -800,7 +812,7 @@ class Admin:
total = len(self.external["users"])
index = 0
ev = Events(
ev = Events(self.app,
"Syncing import users to keycloak",
"Adding user:",
total=len(self.external["users"]),
@ -855,11 +867,11 @@ class Admin:
u["groups"].append(u["roles"][0])
self.resync_data()
def add_moodle_groups(self, groups):
def add_moodle_groups(self, groups : List[Any]) -> None:
### Create all groups. Skip / in system groups
total = len(groups)
log.warning(groups)
ev = Events("Syncing groups from external to moodle", total=len(groups))
ev = Events(self.app, "Syncing groups from external to moodle", total=len(groups))
i = 1
for g in groups:
moodle_groups = kpath2gids(g)
@ -880,9 +892,9 @@ class Admin:
)
i = i + 1
def sync_to_moodle_external(self): # works from the internal (keycloak)
def sync_to_moodle_external(self) -> None: # works from the internal (keycloak)
### Process all groups from the users keycloak_groups key
groups = []
groups : List[DDGroup] = []
for u in self.external["users"]:
groups = groups + u["groups"]
groups = list(dict.fromkeys(groups))
@ -893,7 +905,7 @@ class Admin:
cohorts = self.moodle.get_cohorts()
### Create users in moodle
ev = Events(
ev = Events(self.app,
"Syncing users from external to moodle", total=len(self.internal["users"])
)
for u in self.external["users"]:
@ -920,7 +932,7 @@ class Admin:
# self.resync_data()
### Add user to their cohorts (groups)
ev = Events(
ev = Events(self.app,
"Syncing users groups from external to moodle cohorts",
total=len(self.internal["users"]),
)
@ -938,16 +950,16 @@ class Admin:
log.error(self.moodle.get_user_by("username", u["username"]))
# self.resync_data()
def delete_all_moodle_cohorts(self):
def delete_all_moodle_cohorts(self) -> None:
cohorts = self.moodle.get_cohorts()
ids = [c["id"] for c in cohorts]
self.moodle.delete_cohorts(ids)
def add_nextcloud_groups(self, groups):
def add_nextcloud_groups(self, groups : List[Any]) -> None:
### Create all groups. Skip / in system groups
total = len(groups)
log.warning(groups)
ev = Events("Syncing groups from external to nextcloud", total=len(groups))
ev = Events(self.app, "Syncing groups from external to nextcloud", total=len(groups))
i = 1
for g in groups:
nextcloud_groups = kpath2gids(g)
@ -968,15 +980,15 @@ class Admin:
)
i = i + 1
def sync_to_nextcloud_external(self):
groups = []
def sync_to_nextcloud_external(self) -> None:
groups : List[DDGroup] = []
for u in self.external["users"]:
groups = groups + u["gids"]
groups = list(dict.fromkeys(groups))
self.add_nextcloud_groups(groups)
ev = Events(
ev = Events(self.app,
"Syncing users from external to nextcloud",
total=len(self.internal["users"]),
)
@ -1009,14 +1021,14 @@ class Admin:
except:
log.error(traceback.format_exc())
def sync_to_moodle(self): # works from the internal (keycloak)
def sync_to_moodle(self) -> None: # works from the internal (keycloak)
### Process all groups from the users keycloak_groups key
groups = []
groups : List[str] = []
for u in self.internal["users"]:
groups = groups + u["keycloak_groups"]
groups = list(dict.fromkeys(groups))
ev = Events("Syncing groups from keycloak to moodle", total=len(groups))
ev = Events(self.app, "Syncing groups from keycloak to moodle", total=len(groups))
pathslist = []
for group in groups:
pathpart = ""
@ -1040,7 +1052,7 @@ class Admin:
cohorts = self.moodle.get_cohorts()
### Create users in moodle
ev = Events(
ev = Events(self.app,
"Syncing users from keycloak to moodle", total=len(self.internal["users"])
)
for u in self.internal["users"]:
@ -1067,7 +1079,7 @@ class Admin:
self.resync_data()
ev = Events(
ev = Events(self.app,
"Syncing users with moodle cohorts", total=len(self.internal["users"])
)
cohorts = self.moodle.get_cohorts()
@ -1106,15 +1118,15 @@ class Admin:
self.resync_data()
def sync_to_nextcloud(self):
groups = []
def sync_to_nextcloud(self) -> None:
groups : List[str] = []
for u in self.internal["users"]:
groups = groups + u["keycloak_groups"]
groups = list(dict.fromkeys(groups))
total = len(groups)
i = 0
ev = Events("Syncing groups from keycloak to nextcloud", total=len(groups))
ev = Events(self.app, "Syncing groups from keycloak to nextcloud", total=len(groups))
for g in groups:
parts = g.split("/")
subpath = ""
@ -1137,7 +1149,7 @@ class Admin:
)
i = i + 1
ev = Events(
ev = Events(self.app,
"Syncing users from keycloak to nextcloud",
total=len(self.internal["users"]),
)
@ -1167,13 +1179,13 @@ class Admin:
except:
log.error(traceback.format_exc())
def delete_keycloak_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["keycloak"]:
user = user[0]
def delete_keycloak_user(self, userid : str) -> None:
users : List[DDUser] = [u for u in self.internal["users"] if u["id"] == userid]
if len(users) and users[0]["keycloak"]:
user = users[0]
keycloak_id = user["id"]
else:
return False
return
log.warning("Removing keycloak user: " + user["username"])
try:
self.keycloak.delete_user(keycloak_id)
@ -1183,10 +1195,10 @@ class Admin:
self.av.delete_user_avatar(userid)
def delete_keycloak_users(self):
def delete_keycloak_users(self) -> None:
total = len(self.internal["users"])
i = 0
ev = Events(
ev = Events(self.app,
"Deleting users from keycloak",
"Deleting user:",
total=len(self.internal["users"]),
@ -1217,13 +1229,13 @@ class Admin:
)
self.av.minio_delete_all_objects()
def delete_nextcloud_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["nextcloud"]:
user = user[0]
def delete_nextcloud_user(self, userid : str) -> None:
users : List[DDUser] = [u for u in self.internal["users"] if u["id"] == userid]
if len(users) and users[0]["nextcloud"]:
user = users[0]
nextcloud_id = user["nextcloud_id"]
else:
return False
return
log.warning("Removing nextcloud user: " + user["username"])
try:
self.nextcloud.delete_user(nextcloud_id)
@ -1231,8 +1243,8 @@ class Admin:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + user["username"])
def delete_nextcloud_users(self):
ev = Events("Deleting users from nextcloud", total=len(self.internal["users"]))
def delete_nextcloud_users(self) -> None:
ev = Events(self.app, "Deleting users from nextcloud", total=len(self.internal["users"]))
for u in self.internal["users"]:
if u["nextcloud"] and not u["keycloak"]:
@ -1246,13 +1258,13 @@ class Admin:
log.error(traceback.format_exc())
log.warning("Could not remove user: " + u["username"])
def delete_moodle_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["moodle"]:
user = user[0]
def delete_moodle_user(self, userid : str) -> None:
users : List[DDUser] = [u for u in self.internal["users"] if u["id"] == userid]
if len(users) and users[0]["moodle"]:
user = users[0]
moodle_id = user["moodle_id"]
else:
return False
return
log.warning("Removing moodle user: " + user["username"])
try:
self.moodle.delete_users([moodle_id])
@ -1260,7 +1272,7 @@ class Admin:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + user["username"])
def delete_moodle_users(self):
def delete_moodle_users(self, app : "AdminFlaskApp") -> None:
userids = []
usernames = []
for u in self.internal["users"]:
@ -1288,7 +1300,7 @@ class Admin:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + ",".join(usernames))
def delete_keycloak_groups(self):
def delete_keycloak_groups(self) -> None:
for g in self.internal["groups"]:
if not g["keycloak"]:
continue
@ -1302,7 +1314,7 @@ class Admin:
log.error(traceback.format_exc())
log.warning("Could not remove group: " + g["name"])
def external_roleassign(self, data):
def external_roleassign(self, data : Dict[str, Any]) -> bool:
for newuserid in data["ids"]:
for externaluser in self.external["users"]:
if externaluser["id"] == newuserid:
@ -1316,10 +1328,10 @@ class Admin:
externaluser["gids"].append(data["action"])
return True
def user_update_password(self, userid, password, password_temporary):
def user_update_password(self, userid : str, password : str, password_temporary : bool) -> Any:
return self.keycloak.update_user_pwd(userid, password, password_temporary)
def update_users_from_keycloak(self):
def update_users_from_keycloak(self) -> None:
kgroups = self.keycloak.get_groups()
users = [
{
@ -1339,15 +1351,15 @@ class Admin:
]
for user in users:
ev = Events(
ev = Events(self.app,
"Updating users from keycloak", "User:", total=len(users), table="users"
)
self.user_update(user)
ev.increment({"name": user["username"], "data": user["groups"]})
def user_update(self, user):
def user_update(self, user : DDUser) -> bool:
log.warning("Updating user moodle, nextcloud keycloak")
ev = Events("Updating user", "Updating user in keycloak")
ev = Events(self.app, "Updating user", "Updating user in keycloak")
## Get actual user role
try:
@ -1505,7 +1517,7 @@ class Admin:
ev.update_text("User updated")
return True
def update_keycloak_user(self, user_id, user, kdelete, kadd):
def update_keycloak_user(self, user_id : str, user : DDUser, kdelete : List[Any], kadd : List[Any]) -> bool:
# pprint(self.keycloak.get_user_realm_roles(user_id))
self.keycloak.remove_user_realm_roles(user_id, "student")
self.keycloak.assign_realm_roles(user_id, user["roles"][0])
@ -1521,24 +1533,24 @@ class Admin:
self.resync_data()
return True
def enable_users(self, data):
def enable_users(self, data : List[DDUser]) -> None:
# data={'id':'','username':''}
ev = Events("Bulk actions", "Enabling user:", total=len(data))
ev = Events(self.app, "Bulk actions", "Enabling user:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.keycloak.user_enable(user["id"])
self.resync_data()
def disable_users(self, data):
def disable_users(self, data : List[DDUser]) -> None:
# data={'id':'','username':''}
ev = Events("Bulk actions", "Disabling user:", total=len(data))
ev = Events(self.app, "Bulk actions", "Disabling user:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.keycloak.user_disable(user["id"])
self.resync_data()
def update_moodle_user(self, user_id, user, mdelete, madd):
internaluser = [u for u in self.internal["users"] if u["id"] == user_id][0]
def update_moodle_user(self, user_id : str, user : DDUser, mdelete : Iterable[Any], madd : Iterable[Any]) -> bool:
internaluser : DDUser = [u for u in self.internal["users"] if u["id"] == user_id][0]
cohorts = self.moodle.get_cohorts()
for group in mdelete:
cohort = [c for c in cohorts if c["name"] == group[0]]
@ -1576,29 +1588,29 @@ class Admin:
def add_moodle_user(
self,
username,
email,
first_name,
last_name,
password="*12" + secrets.token_urlsafe(16),
):
username : str,
email : str,
first_name : str,
last_name : str,
password : str="*12" + secrets.token_urlsafe(16),
) -> None:
log.warning("Creating moodle user: " + username)
ev = Events("Add user", username)
ev = Events(self.app, "Add user", username)
try:
self.moodle.create_user(email, username, password, first_name, last_name)
ev.update_text({"name": "Added to moodle", "data": []})
ev.update_text(str({"name": "Added to moodle", "data": []}))
except UserExists:
log.error(" -->> User already exists")
error = Events("User already exists.", str(se), type="error")
error = Events(self.app, "User already exists.", str(se), type="error")
except SystemError as se:
log.error("Moodle create user error: " + str(se))
error = Events("Moodle create user error", str(se), type="error")
error = Events(self.app, "Moodle create user error", str(se), type="error")
except:
log.error(" -->> Error creating on moodle the user: " + username)
print(traceback.format_exc())
error = Events("Internal error", "Check logs", type="error")
error = Events(self.app, "Internal error", "Check logs", type="error")
def update_nextcloud_user(self, user_id, user, ndelete, nadd):
def update_nextcloud_user(self, user_id : str, user : DDUser, ndelete : Iterable[Any], nadd : Iterable[Any]) -> None:
## TODO: Disable de user? Is really needed? it is disabled in keycloak, so can't login again
## ocs/v1.php/cloud/users/{userid}/disable
@ -1648,21 +1660,21 @@ class Admin:
def add_nextcloud_user(
self,
username,
email,
quota,
first_name,
last_name,
groups,
password="*12" + secrets.token_urlsafe(16),
):
username : str,
email : str,
quota : Any,
first_name : str,
last_name : str,
groups : str,
password : str = "*12" + secrets.token_urlsafe(16),
) -> None:
log.warning(
" NEXTCLOUD USERS: Creating nextcloud user: "
+ username
+ " in groups "
+ str(groups)
)
ev = Events("Add user", username)
ev = Events(self.app, "Add user", username)
try:
# Quota is "1 GB", "500 MB"
self.nextcloud.add_user_with_groups(
@ -1676,16 +1688,16 @@ class Admin:
except:
log.error(traceback.format_exc())
def delete_users(self, data):
ev = Events("Bulk actions", "Deleting users:", total=len(data))
def delete_users(self, data : List[DDUser]) -> None:
ev = Events(self.app, "Bulk actions", "Deleting users:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.delete_user(user["id"])
self.resync_data()
def delete_user(self, userid):
def delete_user(self, userid : str) -> bool:
log.warning("Deleting user moodle, nextcloud keycloak")
ev = Events("Deleting user", "Deleting from moodle")
ev = Events(self.app, "Deleting user", "Deleting from moodle")
self.delete_moodle_user(userid)
ev.update_text("Deleting from nextcloud")
self.delete_nextcloud_user(userid)
@ -1694,23 +1706,22 @@ class Admin:
ev.update_text("Syncing data from applications...")
self.resync_data()
ev.update_text("User deleted")
sio_event_send("delete_user", {"userid": userid})
sio_event_send(self.app, "delete_user", {"userid": userid})
return True
def get_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
def get_user(self, userid : str) -> Optional[DDUser]:
user : List[DDUser] = [u for u in self.internal["users"] if u["id"] == userid]
if not len(user):
return False
return None
return user[0]
def get_user_username(self, username):
user = [u for u in self.internal["users"] if u["username"] == username]
def get_user_username(self, username : str) -> Optional[DDUser]:
user : List[DDUser] = [u for u in self.internal["users"] if u["username"] == username]
if not len(user):
return False
return None
return user[0]
def add_user(self, u):
def add_user(self, u : DDUser) -> Any:
pathslist = []
for group in u["groups"]:
pathpart = ""
@ -1739,7 +1750,7 @@ class Admin:
### KEYCLOAK
#######################
ev = Events("Add user", u["username"], total=5)
ev = Events(self.app, "Add user", u["username"], total=5)
log.warning(" KEYCLOAK USERS: Adding user: " + u["username"])
uid = self.keycloak.add_user(
u["username"],
@ -1784,14 +1795,14 @@ class Admin:
ev.increment({"name": "Added to moodle", "data": []})
except UserExists:
log.error(" -->> User already exists")
error = Events("User already exists.", str(se), type="error")
error = Events(self.app, "User already exists.", str(se), type="error")
except SystemError as se:
log.error("Moodle create user error: " + str(se))
error = Events("Moodle create user error", str(se), type="error")
error = Events(self.app, "Moodle create user error", str(se), type="error")
except:
log.error(" -->> Error creating on moodle the user: " + u["username"])
print(traceback.format_exc())
error = Events("Internal error", "Check logs", type="error")
error = Events(self.app, "Internal error", "Check logs", type="error")
# Add user to cohort
## Get all existing moodle cohorts
@ -1847,30 +1858,29 @@ class Admin:
log.error(traceback.format_exc())
self.resync_data()
sio_event_send("new_user", u)
sio_event_send(self.app, "new_user", u)
return uid
def add_group(self, g):
def add_group(self, g : DDGroup) -> str:
# TODO: Check if exists
# We add in keycloak with his name, will be shown in app with full path with dots
if g["parent"] != None:
g["parent"] = gid2kpath(g["parent"])
new_path = self.keycloak.add_group(g["name"], g["parent"])
new_path_kc = self.keycloak.add_group(g["name"], g["parent"])
new_path : str = g["name"]
if g["parent"] != None:
new_path = kpath2gid(new_path["path"])
else:
new_path = g["name"]
new_path = kpath2gid(new_path_kc["path"])
self.moodle.add_system_cohort(new_path, description=g["description"])
self.nextcloud.add_group(new_path)
self.resync_data()
return new_path
def delete_group_by_id(self, group_id):
ev = Events("Deleting group", "Deleting from keycloak")
def delete_group_by_id(self, group_id : str) -> None:
ev = Events(self.app, "Deleting group", "Deleting from keycloak")
try:
keycloak_group = self.keycloak.get_group_by_id(group_id)
except Exception as e:
@ -1904,7 +1914,7 @@ class Admin:
self.nextcloud.delete_group(sg_gid)
self.resync_data()
def delete_group_by_path(self, path):
def delete_group_by_path(self, path : str) -> None:
group = self.keycloak.get_group_by_path(path)
to_be_deleted = []
@ -1926,5 +1936,5 @@ class Admin:
self.nextcloud.delete_group(gid)
self.resync_data()
def set_nextcloud_user_mail(self, data):
def set_nextcloud_user_mail(self, data : Any) -> None:
self.nextcloud.set_user_mail(data)

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -23,10 +24,11 @@ import logging as log
import os
import traceback
from flask import jsonify, request
from typing import Any, Dict, Union, List
from admin import app
from flask import request
# TODO: Improve these constants' structure
content_type = {"Content-Type": "application/json"}
ex = {
"bad_request": {
@ -96,8 +98,10 @@ ex = {
class Error(Exception):
def __init__(self, error="bad_request", description="", debug="", data=None):
self.error = ex[error]["error"].copy()
status_code : int
content_type : Dict[str, str]
def __init__(self, error : str ="bad_request", description : str="", debug : Union[str, List[str]]="", data : Any =None):
self.error : Dict[str, str] = (ex[error]["error"]).copy() # type: ignore # bad struct
self.error["function"] = (
inspect.stack()[1][1].split(os.sep)[-1]
+ ":"
@ -123,7 +127,7 @@ class Error(Exception):
"----------- REQUEST START -----------",
request.method + " " + request.url,
"\r\n".join("{}: {}".format(k, v) for k, v in request.headers.items()),
request.body if hasattr(request, "body") else "",
getattr(request, "body", ""),
"----------- REQUEST STOP -----------",
)
if request
@ -138,7 +142,7 @@ class Error(Exception):
if data
else ""
)
self.status_code = ex[error]["status_code"]
self.status_code = ex[error]["status_code"] # type: ignore # bad struct
self.content_type = content_type
log.debug(
"%s - %s - [%s -> %s]\r\n%s\r\n%s\r\n%s"
@ -152,11 +156,3 @@ class Error(Exception):
self.error["data"],
)
)
@app.errorhandler(Error)
def handle_user_error(ex):
response = jsonify(ex.error)
response.status_code = ex.status_code
response.headers = {"content-type": content_type}
return response

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -26,11 +27,13 @@ from minio.commonconfig import REPLACE, CopySource
from minio.deleteobjects import DeleteObject
from requests import get, post
from admin import app
from typing import Any, Callable, Dict, Iterable, List
class Avatars:
def __init__(self):
avatars_path : str
def __init__(self, avatars_path : str):
self.avatars_path = avatars_path
self.mclient = Minio(
"dd-sso-avatars:9000",
access_key="AKIAIOSFODNN7EXAMPLE",
@ -41,21 +44,22 @@ class Avatars:
self._minio_set_realm()
# self.update_missing_avatars()
def add_user_default_avatar(self, userid, role="unknown"):
def add_user_default_avatar(self, userid : str, role : str="unknown") -> None:
path = os.path.join(self.avatars_path, role) + ".jpg",
self.mclient.fput_object(
self.bucket,
userid,
os.path.join(app.root_path, "../custom/avatars/" + role + ".jpg"),
path,
content_type="image/jpeg ",
)
log.warning(
" AVATARS: Updated avatar for user " + userid + " with role " + role
)
def delete_user_avatar(self, userid):
def delete_user_avatar(self, userid : str) -> None:
self.minio_delete_object(userid)
def update_missing_avatars(self, users):
def update_missing_avatars(self, users : Iterable[Dict[str, Any]]) -> None:
sys_roles = ["admin", "manager", "teacher", "student"]
for u in self.get_users_without_image(users):
try:
@ -63,10 +67,11 @@ class Avatars:
except:
img = "unknown.jpg"
path = os.path.join(self.avatars_path, img)
self.mclient.fput_object(
self.bucket,
u["id"],
os.path.join(app.root_path, "../custom/avatars/" + img),
path,
content_type="image/jpeg ",
)
log.warning(
@ -76,26 +81,24 @@ class Avatars:
+ img.split(".")[0]
)
def _minio_set_realm(self):
def _minio_set_realm(self) -> None:
if not self.mclient.bucket_exists(self.bucket):
self.mclient.make_bucket(self.bucket)
def minio_get_objects(self):
def minio_get_objects(self) -> List[Any]:
return [o.object_name for o in self.mclient.list_objects(self.bucket)]
def minio_delete_all_objects(self):
delete_object_list = map(
lambda x: DeleteObject(x.object_name),
self.mclient.list_objects(self.bucket),
)
def minio_delete_all_objects(self) -> None:
f : Callable[[Any], Any] = lambda x: DeleteObject(x.object_name)
delete_object_list = map(f, self.mclient.list_objects(self.bucket))
errors = self.mclient.remove_objects(self.bucket, delete_object_list)
for error in errors:
log.error(" AVATARS: Error occured when deleting avatar object: " + error)
def minio_delete_object(self, oid):
def minio_delete_object(self, oid : str) -> None:
errors = self.mclient.remove_objects(self.bucket, [DeleteObject(oid)])
for error in errors:
log.error(" AVATARS: Error occured when deleting avatar object: " + error)
def get_users_without_image(self, users):
def get_users_without_image(self, users : Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
return [u for u in users if u["id"] and u["id"] not in self.minio_get_objects()]

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -29,16 +30,22 @@ import yaml
from PIL import Image
from schema import And, Optional, Schema, SchemaError, Use
from admin import app
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from werkzeug import FileStorage
class Dashboard:
app : "AdminFlaskApp"
def __init__(
self,
):
app : "AdminFlaskApp",
) -> None:
self.app = app
self.custom_menu = os.path.join(app.root_path, "../custom/menu/custom.yaml")
def _update_custom_menu(self, custom_menu_part):
def _update_custom_menu(self, custom_menu_part : Dict[str, Any]) -> bool:
with open(self.custom_menu) as yml:
menu = yaml.load(yml, Loader=yaml.FullLoader)
menu = {**menu, **custom_menu_part}
@ -46,7 +53,7 @@ class Dashboard:
yml.write(yaml.dump(menu, default_flow_style=False))
return True
def update_colours(self, colours):
def update_colours(self, colours : Dict[str, Any]) -> bool:
schema_template = Schema(
{
"background": And(Use(str)),
@ -63,7 +70,7 @@ class Dashboard:
self._update_custom_menu({"colours": colours})
return self.apply_updates()
def update_menu(self, menu):
def update_menu(self, menu : Dict[str, Any]) -> bool:
items = []
for menu_item in menu.keys():
for mustexist_key in ["href", "icon", "name", "shortname"]:
@ -73,16 +80,16 @@ class Dashboard:
self._update_custom_menu({"apps_external": items})
return self.apply_updates()
def update_logo(self, logo):
def update_logo(self, logo : FileStorage) -> bool:
img = Image.open(logo.stream)
img.save(os.path.join(app.root_path, "../custom/img/logo.png"))
img.save(os.path.join(self.app.root_path, "../custom/img/logo.png"))
return self.apply_updates()
def update_background(self, background):
def update_background(self, background : FileStorage) -> bool:
img = Image.open(background.stream)
img.save(os.path.join(app.root_path, "../custom/img/background.png"))
img.save(os.path.join(self.app.root_path, "../custom/img/background.png"))
return self.apply_updates()
def apply_updates(self):
def apply_updates(self) -> bool:
resp = requests.get("http://dd-sso-api:7039/restart")
return True

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -38,34 +39,46 @@ from flask_socketio import (
send,
)
from admin import app
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
def sio_event_send(event, data):
def sio_event_send(app : "AdminFlaskApp", event : str, data : Dict[str, Any]) -> None:
app.socketio.emit(
event,
json.dumps(data),
namespace="/sio/events",
room="events",
)
# TODO: Why on earth do we find these all over the place?
sleep(0.001)
class Events:
def __init__(self, title, text="", total=0, table=False, type="info"):
app : "AdminFlaskApp"
eid : str
title : str
text : str
total : int
table : bool
type : str
def __init__(self, app : "AdminFlaskApp", title : str, text : str="", total : int=0, table : bool=False, type : str="info") -> None:
self.app = app
# notice, info, success, and error
self.eid = str(base64.b64encode(os.urandom(32))[:8])
self.title = title
self.text = text
self.total = total
# TODO: this is probably replacing the .table method????
self.table = table
self.item = 0
self.type = type
self.create()
def create(self):
def create(self) -> None:
log.info("START " + self.eid + ": " + self.text)
app.socketio.emit(
self.app.socketio.emit(
"notify-create",
json.dumps(
{
@ -80,9 +93,9 @@ class Events:
)
sleep(0.001)
def __del__(self):
def __del__(self) -> None:
log.info("END " + self.eid + ": " + self.text)
app.socketio.emit(
self.app.socketio.emit(
"notify-destroy",
json.dumps({"id": self.eid}),
namespace="/sio",
@ -90,9 +103,9 @@ class Events:
)
sleep(0.001)
def update_text(self, text):
def update_text(self, text : str) -> None:
self.text = text
app.socketio.emit(
self.app.socketio.emit(
"notify-update",
json.dumps(
{
@ -105,9 +118,9 @@ class Events:
)
sleep(0.001)
def append_text(self, text):
def append_text(self, text : str) -> None:
self.text = self.text + "<br>" + text
app.socketio.emit(
self.app.socketio.emit(
"notify-update",
json.dumps(
{
@ -120,10 +133,10 @@ class Events:
)
sleep(0.001)
def increment(self, data={"name": "", "data": []}):
def increment(self, data : Dict[str, Any]={"name": "", "data": []}) -> None:
self.item += 1
log.info("INCREMENT " + self.eid + ": " + self.text)
app.socketio.emit(
self.app.socketio.emit(
"notify-increment",
json.dumps(
{
@ -149,10 +162,10 @@ class Events:
)
sleep(0.0001)
def decrement(self, data={"name": "", "data": []}):
def decrement(self, data : Dict[str, Any]={"name": "", "data": []}) -> None:
self.item -= 1
log.info("DECREMENT " + self.eid + ": " + self.text)
app.socketio.emit(
self.app.socketio.emit(
"notify-decrement",
json.dumps(
{
@ -178,13 +191,13 @@ class Events:
)
sleep(0.001)
def reload(self):
app.socketio.emit("reload", json.dumps({}), namespace="/sio", room="admin")
def reload(self) -> None:
self.app.socketio.emit("reload", json.dumps({}), namespace="/sio", room="admin")
sleep(0.0001)
def table(self, event, table, data={}):
def table(self, event : str, table : bool, data : Dict[str, Any]={}) -> None:
# refresh, add, delete, update
app.socketio.emit(
self.app.socketio.emit(
"table_" + event,
json.dumps({"table": table, "data": data}),
namespace="/sio",

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -22,8 +23,11 @@ import string
from collections import Counter
from pprint import pprint
from typing import Any, Dict, Generator, Iterable, Optional, List
def get_recursive_groups(l_groups, l):
DDGroup = Dict[str, Any]
def get_recursive_groups(l_groups : Iterable[DDGroup], l : List[DDGroup]) -> List[DDGroup]:
for d_group in l_groups:
data = {}
for key, value in d_group.items():
@ -35,11 +39,11 @@ def get_recursive_groups(l_groups, l):
return l
def get_group_with_childs(keycloak_group):
def get_group_with_childs(keycloak_group : DDGroup) -> List[str]:
return [g["path"] for g in get_recursive_groups([keycloak_group], [])]
def system_username(username):
def system_username(username : str) -> bool:
return (
True
if username in ["guest", "ddadmin", "admin"] or username.startswith("system_")
@ -47,41 +51,43 @@ def system_username(username):
)
def system_group(groupname):
def system_group(groupname : str) -> bool:
return True if groupname in ["admin", "manager", "teacher", "student"] else False
def get_group_from_group_id(group_id, groups):
def get_group_from_group_id(group_id : str, groups : Iterable[DDGroup]) -> Optional[DDGroup]:
return next((d for d in groups if d.get("id") == group_id), None)
def get_kid_from_kpath(kpath, groups):
ids = [g["id"] for g in groups if g["path"] == kpath]
if not len(ids) or len(ids) > 1:
return False
def get_kid_from_kpath(kpath : str, groups : Iterable[DDGroup]) -> Optional[str]:
ids : List[str] = [g["id"] for g in groups if g["path"] == kpath]
if len(ids) != 1:
return None
return ids[0]
def get_gid_from_kgroup_id(kgroup_id, groups):
return [
def get_gid_from_kgroup_id(kgroup_id : str, groups : Iterable[DDGroup]) -> str:
# TODO: Why is this interface different from get_kid_from_kpath?
o : List[str] = [
g["path"].replace("/", ".")[1:] if len(g["path"].split("/")) else g["path"][1:]
for g in groups
if g["id"] == kgroup_id
][0]
]
return o[0]
def get_gids_from_kgroup_ids(kgroup_ids, groups):
def get_gids_from_kgroup_ids(kgroup_ids : Iterable[str], groups : Iterable[DDGroup]) -> List[str]:
return [get_gid_from_kgroup_id(kgroup_id, groups) for kgroup_id in kgroup_ids]
def kpath2gid(path):
def kpath2gid(path : str) -> str:
# print(path.replace('/','.')[1:])
if path.startswith("/"):
return path.replace("/", ".")[1:]
return path.replace("/", ".")
def kpath2gids(path):
def kpath2gids(path : str) -> List[str]:
path = kpath2gid(path)
l = []
for i in range(len(path.split("."))):
@ -89,44 +95,45 @@ def kpath2gids(path):
return l
def kpath2kpaths(path):
def kpath2kpaths(path : str) -> List[str]:
l = []
for i in range(len(path.split("/"))):
l.append("/".join(path.split("/")[: i + 1]))
return l[1:]
def gid2kpath(gid):
def gid2kpath(gid : str) -> str:
return "/" + gid.replace(".", "/")
def count_repeated(itemslist):
def count_repeated(itemslist : Iterable[Any]) -> None:
print(Counter(itemslist))
def groups_kname2gid(groups):
def groups_kname2gid(groups : Iterable[str]) -> List[str]:
return [name.replace(".", "/") for name in groups]
def groups_path2id(groups):
def groups_path2id(groups : Iterable[str]) -> List[str]:
return [g.replace("/", ".")[1:] for g in groups]
def groups_id2path(groups):
def groups_id2path(groups : Iterable[str]) -> List[str]:
return ["/" + g.replace(".", "/") for g in groups]
def filter_roles_list(role_list):
def filter_roles_list(role_list : Iterable[str]) -> List[str]:
client_roles = ["admin", "manager", "teacher", "student"]
return [r for r in role_list if r in client_roles]
def filter_roles_listofdicts(role_listofdicts):
def filter_roles_listofdicts(role_listofdicts : Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
client_roles = ["admin", "manager", "teacher", "student"]
return [r for r in role_listofdicts if r["name"] in client_roles]
def rand_password(lenght):
def rand_password(lenght : int) -> str:
# TODO: why is this not using py3's secrets?
characters = string.ascii_letters + string.digits + string.punctuation
passwd = "".join(random.choice(characters) for i in range(lenght))
while not any(ele.isupper() for ele in passwd):

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -34,23 +35,33 @@ from .api_exceptions import Error
from .helpers import get_recursive_groups, kpath2kpaths
from .postgres import Postgres
# from admin import app
from typing import cast, Any, Dict, Iterable, List, Optional
DDUser = Dict[str, Any]
# TODO: Improve typing of these class and simplify it
class KeycloakClient:
"""https://www.keycloak.org/docs-api/13.0/rest-api/index.html
https://github.com/marcospereirampj/python-keycloak
https://gist.github.com/kaqfa/99829941121188d7cef8271f93f52f1f
"""
url : str
username : str
password : str
realm : str
verify : bool
keycloak_pg : Postgres
keycloak_admin : KeycloakAdmin
def __init__(
self,
url="http://dd-sso-keycloak:8080/auth/",
username=os.environ["KEYCLOAK_USER"],
password=os.environ["KEYCLOAK_PASSWORD"],
realm="master",
verify=True,
):
url : str="http://dd-sso-keycloak:8080/auth/",
username : str=os.environ["KEYCLOAK_USER"],
password : str=os.environ["KEYCLOAK_PASSWORD"],
realm : str="master",
verify : bool=True,
) -> None:
self.url = url
self.username = username
self.password = password
@ -64,7 +75,7 @@ class KeycloakClient:
os.environ["KEYCLOAK_DB_PASSWORD"],
)
def connect(self):
def connect(self) -> None:
self.keycloak_admin = KeycloakAdmin(
server_url=self.url,
username=self.username,
@ -78,15 +89,19 @@ class KeycloakClient:
""" USERS """
def get_user_id(self, username):
def get_user_id(self, username : str) -> str:
self.connect()
return self.keycloak_admin.get_user_id(username)
uid : str = self.keycloak_admin.get_user_id(username)
return uid
def get_users(self):
def get_users(self) -> Iterable[Dict[str, Any]]:
# https://www.keycloak.org/docs-api/18.0/rest-api/index.html#_userrepresentation
self.connect()
return self.keycloak_admin.get_users({})
o : Iterable[Dict[str, Any]] = self.keycloak_admin.get_users({})
return o
def get_users_with_groups_and_roles(self):
# TODO: what is this actually doing?
def get_users_with_groups_and_roles(self) -> List[DDUser]:
q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, u.enabled, ua.value as quota
,json_agg(g."id") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
,json_agg(r.name) as role
@ -125,7 +140,7 @@ class KeycloakClient:
return list_dict_users
def getparent(self, group_id, data):
def getparent(self, group_id : str, data : Iterable[Any]) -> str:
# Recursively get full path from any group_id in the tree
path = ""
for item in data:
@ -134,14 +149,14 @@ class KeycloakClient:
path = f"{path}/{item[1]}"
return path
def get_group_path(self, group_id):
def get_group_path(self, group_id : str) -> str:
# Get full path using getparent recursive func
# RETURNS: String with full path
q = """SELECT * FROM keycloak_group"""
groups = self.keycloak_pg.select(q)
return self.getparent(group_id, groups)
def get_user_groups_paths(self, user_id):
def get_user_groups_paths(self, user_id : str) -> List[str]:
# Get full paths for user grups
# RETURNS list of paths
q = """SELECT group_id FROM user_group_membership WHERE user_id = '%s'""" % (
@ -165,20 +180,20 @@ class KeycloakClient:
def add_user(
self,
username,
first,
last,
email,
password,
group=False,
password_temporary=True,
enabled=True,
):
username : str,
first : str,
last : str,
email : str,
password : str,
group : Any=False,
password_temporary : bool=True,
enabled : bool=True,
) -> Any:
# RETURNS string with keycloak user id (the main id in this app)
self.connect()
username = username.lower()
try:
uid = self.keycloak_admin.create_user(
uid : Any = self.keycloak_admin.create_user(
{
"email": email,
"username": username,
@ -213,7 +228,7 @@ class KeycloakClient:
self.keycloak_admin.group_user_add(uid, gid)
return uid
def update_user_pwd(self, user_id, password, password_temporary=True):
def update_user_pwd(self, user_id : str, password : str, password_temporary : bool=True) -> Any:
# Updates
payload = {
"credentials": [
@ -223,7 +238,7 @@ class KeycloakClient:
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_update(self, user_id, enabled, email, first, last, groups=[], roles=[]):
def user_update(self, user_id : str, enabled : bool, email : str, first : str, last : str, groups : Iterable[str]=[], roles : Iterable[str]=[]) -> Any:
## NOTE: Roles didn't seem to be updated/added. Also not confident with groups
# Updates
payload = {
@ -237,17 +252,17 @@ class KeycloakClient:
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_enable(self, user_id):
def user_enable(self, user_id : str) -> Any:
payload = {"enabled": True}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_disable(self, user_id):
def user_disable(self, user_id : str) -> Any:
payload = {"enabled": False}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def group_user_remove(self, user_id, group_id):
def group_user_remove(self, user_id : str, group_id : str) -> Any:
self.connect()
return self.keycloak_admin.group_user_remove(user_id, group_id)
@ -255,7 +270,7 @@ class KeycloakClient:
# self.connect()
# return self.keycloak_admin.assign_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
def remove_user_realm_roles(self, user_id, roles):
def remove_user_realm_roles(self, user_id : str, roles : Iterable[str]) -> Any:
self.connect()
roles = [
r
@ -264,66 +279,66 @@ class KeycloakClient:
]
return self.keycloak_admin.delete_user_realm_role(user_id, roles)
def delete_user(self, userid):
def delete_user(self, userid : str) -> Any:
self.connect()
return self.keycloak_admin.delete_user(user_id=userid)
def get_user_groups(self, userid):
def get_user_groups(self, userid : str) -> Any:
self.connect()
return self.keycloak_admin.get_user_groups(user_id=userid)
def get_user_realm_roles(self, userid):
def get_user_realm_roles(self, userid : str) -> Any:
self.connect()
return self.keycloak_admin.get_realm_roles_of_user(user_id=userid)
def add_user_client_role(self, client_id, user_id, role_id, role_name):
def add_user_client_role(self, client_id : str, user_id : str, role_id : str, role_name : str) -> Any:
self.connect()
return self.keycloak_admin.assign_client_role(
client_id=client_id, user_id=user_id, role_id=role_id, role_name="test"
)
## GROUPS
def get_all_groups(self):
def get_all_groups(self) -> Iterable[Any]:
## RETURNS ONLY MAIN GROUPS WITH NESTED subGroups list
self.connect()
return self.keycloak_admin.get_groups()
return cast(Iterable[Any], self.keycloak_admin.get_groups())
def get_groups(self, with_subgroups=True):
def get_groups(self, with_subgroups : bool=True) -> Iterable[Any]:
## RETURNS ALL GROUPS in root list
self.connect()
groups = self.keycloak_admin.get_groups()
return get_recursive_groups(groups, [])
def get_group_by_id(self, group_id):
def get_group_by_id(self, group_id : str) -> Any:
self.connect()
return self.keycloak_admin.get_group(group_id=group_id)
def get_group_by_path(self, path, recursive=True):
def get_group_by_path(self, path : str, recursive : bool=True) -> Any:
self.connect()
return self.keycloak_admin.get_group_by_path(
path=path, search_in_subgroups=recursive
)
def add_group(self, name, parent=None, skip_exists=False):
def add_group(self, name : str, parent : str="", skip_exists : bool=False) -> Any:
self.connect()
if parent != None:
if parent:
parent = self.get_group_by_path(parent)["id"]
return self.keycloak_admin.create_group({"name": name}, parent=parent)
def delete_group(self, group_id):
def delete_group(self, group_id : str) -> Any:
self.connect()
return self.keycloak_admin.delete_group(group_id=group_id)
def group_user_add(self, user_id, group_id):
def group_user_add(self, user_id : str, group_id : str) -> Any:
self.connect()
return self.keycloak_admin.group_user_add(user_id, group_id)
def add_group_tree(self, path):
def add_group_tree(self, path : str) -> None:
paths = kpath2kpaths(path)
parent = "/"
for path in paths:
try:
parent_path = None if parent == "/" else parent
parent_path = "" if parent == "/" else parent
# print("parent: "+str(parent_path)+" path: "+path.split("/")[-1])
self.add_group(path.split("/")[-1], parent_path, skip_exists=True)
parent = path
@ -333,8 +348,8 @@ class KeycloakClient:
parent = path
def add_user_with_groups_and_role(
self, username, first, last, email, password, role, groups
):
self, username : str, first : str, last : str, email : str, password : str, role : str, groups : Iterable[str]
) -> None:
## Add user
uid = self.add_user(username, first, last, email, password)
## Add user to role
@ -348,7 +363,7 @@ class KeycloakClient:
for g in groups:
log.warning("Creating keycloak group: " + g)
parts = g.split("/")
parent_path = None
parent_path = ""
for i in range(1, len(parts)):
# parent_id=None if parent_path==None else self.get_group(parent_path)['id']
try:
@ -360,10 +375,7 @@ class KeycloakClient:
+ " already exists. Skipping creation"
)
pass
if parent_path is None:
thepath = "/" + parts[i]
else:
thepath = parent_path + "/" + parts[i]
thepath = parent_path + "/" + parts[i]
if thepath == "/":
log.warning(
"Not adding the user "
@ -385,53 +397,51 @@ class KeycloakClient:
)
self.keycloak_admin.group_user_add(uid, gid)
if parent_path == None:
parent_path = ""
parent_path = parent_path + "/" + parts[i]
parent_path += "/" + parts[i]
# self.group_user_add(uid,gid)
## ROLES
def get_roles(self):
def get_roles(self) -> Iterable[Any]:
self.connect()
return self.keycloak_admin.get_realm_roles()
return cast(Iterable[Any], self.keycloak_admin.get_realm_roles())
def get_role(self, name):
def get_role(self, name : str) -> Any:
self.connect()
return self.keycloak_admin.get_realm_role(name)
def add_role(self, name, description=""):
def add_role(self, name : str, description : str="") -> Any:
self.connect()
return self.keycloak_admin.create_realm_role(
{"name": name, "description": description}
)
def delete_role(self, name):
def delete_role(self, name : str) -> Any:
self.connect()
return self.keycloak_admin.delete_realm_role(name)
## CLIENTS
def get_client_roles(self, client_id):
def get_client_roles(self, client_id : str) -> Any:
self.connect()
return self.keycloak_admin.get_client_roles(client_id=client_id)
def add_client_role(self, client_id, name, description=""):
def add_client_role(self, client_id : str, name : str, description : str="") -> Any:
self.connect()
return self.keycloak_admin.create_client_role(
client_id, {"name": name, "description": description, "clientRole": True}
)
## SYSTEM
def get_server_info(self):
def get_server_info(self) -> Any:
self.connect()
return self.keycloak_admin.get_server_info()
def get_server_clients(self):
def get_server_clients(self) -> Any:
self.connect()
return self.keycloak_admin.get_clients()
def get_server_rsa_key(self):
def get_server_rsa_key(self) -> Any:
self.connect()
rsa_key = [
k for k in self.keycloak_admin.get_keys()["keys"] if k["type"] == "RSA"
@ -439,22 +449,21 @@ class KeycloakClient:
return {"name": rsa_key["kid"], "certificate": rsa_key["certificate"]}
## REALM
def assign_realm_roles(self, user_id, role):
def assign_realm_roles(self, user_id : str, role : str) -> Any:
self.connect()
try:
role = [
kcroles = [
r for r in self.keycloak_admin.get_realm_roles() if r["name"] == role
]
except:
return False
return self.keycloak_admin.assign_realm_roles(user_id=user_id, roles=role)
# return self.keycloak_admin.assign_realm_roles(user_id=user_id, client_id=None, roles=role)
return self.keycloak_admin.assign_realm_roles(user_id=user_id, roles=kcroles)
## CLIENTS
def delete_client(self, clientid):
def delete_client(self, clientid : str) -> Any:
self.connect()
return self.keycloak_admin.delete_client(clientid)
def add_client(self, client):
def add_client(self, client : str) -> Any:
self.connect()
return self.keycloak_admin.create_client(client)

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -21,7 +22,6 @@ import logging as log
import os
import traceback
from admin import app
from pprint import pprint
from minio import Minio
@ -29,18 +29,22 @@ from minio.commonconfig import REPLACE, CopySource
from minio.deleteobjects import DeleteObject
from requests import get, post
legal_path= os.path.join(app.root_path, "static/templates/pages/legal/")
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
def get_legal(lang):
with open(legal_path+lang, "r") as languagefile:
# TODO: Fix all this
def get_legal(app : "AdminFlaskApp", lang : str) -> str:
with open(app.legal_path+lang, "r") as languagefile:
return languagefile.read()
def gen_legal_if_not_exists(lang):
if not os.path.isfile(legal_path+lang):
def gen_legal_if_not_exists(app : "AdminFlaskApp", lang : str) -> None:
if not os.path.isfile(app.legal_path+lang):
log.debug("Creating new language file")
with open(legal_path+lang, "w") as languagefile:
with open(app.legal_path+lang, "w") as languagefile:
languagefile.write("<b>Legal</b><br>This is the default legal page for language " + lang)
def new_legal(lang,html):
with open(legal_path+lang, "w") as languagefile:
def new_legal(app : "AdminFlaskApp", lang : str, html : str) -> None:
with open(app.legal_path+lang, "w") as languagefile:
languagefile.write(html)

View File

@ -1,94 +0,0 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
#
# This file is part of DD
#
# DD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# DD is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import logging as log
import os
import sys
import traceback
import yaml
from cerberus import Validator, rules_set_registry, schema_registry
from admin import app
class AdminValidator(Validator):
None
# def _normalize_default_setter_genid(self, document):
# return _parse_string(document["name"])
# def _normalize_default_setter_genidlower(self, document):
# return _parse_string(document["name"]).lower()
# def _normalize_default_setter_gengroupid(self, document):
# return _parse_string(
# document["parent_category"] + "-" + document["uid"]
# ).lower()
def load_validators(purge_unknown=True):
validators = {}
schema_path = os.path.join(app.root_path, "schemas")
for schema_filename in os.listdir(schema_path):
try:
with open(os.path.join(schema_path, schema_filename)) as file:
schema_yml = file.read()
schema = yaml.load(schema_yml, Loader=yaml.FullLoader)
validators[schema_filename.split(".")[0]] = AdminValidator(
schema, purge_unknown=purge_unknown
)
except IsADirectoryError:
None
return validators
app.validators = load_validators()
class loadConfig:
def __init__(self, app=None):
try:
app.config.setdefault("DOMAIN", os.environ["DOMAIN"])
app.config.setdefault(
"KEYCLOAK_POSTGRES_USER", os.environ["KEYCLOAK_DB_USER"]
)
app.config.setdefault(
"KEYCLOAK_POSTGRES_PASSWORD", os.environ["KEYCLOAK_DB_PASSWORD"]
)
app.config.setdefault(
"MOODLE_POSTGRES_USER", os.environ["MOODLE_POSTGRES_USER"]
)
app.config.setdefault(
"MOODLE_POSTGRES_PASSWORD", os.environ["MOODLE_POSTGRES_PASSWORD"]
)
app.config.setdefault(
"NEXTCLOUD_POSTGRES_USER", os.environ["NEXTCLOUD_POSTGRES_USER"]
)
app.config.setdefault(
"NEXTCLOUD_POSTGRES_PASSWORD", os.environ["NEXTCLOUD_POSTGRES_PASSWORD"]
)
app.config.setdefault(
"VERIFY", True if os.environ["VERIFY"] == "true" else False
)
app.config.setdefault("API_SECRET", os.environ.get("API_SECRET"))
except Exception as e:
log.error(traceback.format_exc())
raise

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -23,11 +24,15 @@ from pprint import pprint
from requests import get, post
from admin import app
from .exceptions import UserExists, UserNotFound
from .postgres import Postgres
from typing import TYPE_CHECKING, cast, Any, Dict, Iterable, List, Optional
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
# Module variables to connect to moodle api
@ -36,18 +41,20 @@ class Moodle:
https://docs.moodle.org/dev/Web_service_API_functions
https://docs.moodle.org/311/en/Using_web_services
"""
key: str
url : str
endpoint : str
verify : bool
moodle_pg : Postgres
def __init__(
self,
key=app.config["MOODLE_WS_TOKEN"],
url="https://moodle." + app.config["DOMAIN"],
endpoint="/webservice/rest/server.php",
verify=app.config["VERIFY"],
):
self.key = key
self.url = url
app : "AdminFlaskApp",
endpoint : str="/webservice/rest/server.php",
) -> None:
self.key = app.config["MOODLE_WS_TOKEN"]
self.url = f"https://moodle.{ app.config['DOMAIN'] }"
self.endpoint = endpoint
self.verify = verify
self.verify = cast(bool, app.config["VERIFY"])
self.moodle_pg = Postgres(
"dd-apps-postgresql",
@ -56,7 +63,7 @@ class Moodle:
app.config["MOODLE_POSTGRES_PASSWORD"],
)
def rest_api_parameters(self, in_args, prefix="", out_dict=None):
def rest_api_parameters(self, in_args : Any, prefix : str="", out_dict : Optional[Dict]=None) -> Dict[Any, Any]:
"""Transform dictionary/array structure to a flat dictionary, with key names
defining the structure.
Example usage:
@ -64,24 +71,23 @@ class Moodle:
{'courses[0][id]':1,
'courses[0][name]':'course1'}
"""
if out_dict == None:
out_dict = {}
o : Dict[Any, Any] = {} if out_dict is None else out_dict
if not type(in_args) in (list, dict):
out_dict[prefix] = in_args
return out_dict
o[prefix] = in_args
return o
if prefix == "":
prefix = prefix + "{0}"
else:
prefix = prefix + "[{0}]"
if type(in_args) == list:
for idx, item in enumerate(in_args):
self.rest_api_parameters(item, prefix.format(idx), out_dict)
self.rest_api_parameters(item, prefix.format(idx), o)
elif type(in_args) == dict:
for key, item in in_args.items():
self.rest_api_parameters(item, prefix.format(key), out_dict)
return out_dict
self.rest_api_parameters(item, prefix.format(key), o)
return o
def call(self, fname, **kwargs):
def call(self, fname : str, **kwargs : Any) -> Any:
"""Calls moodle API function with function name fname and keyword arguments.
Example:
>>> call_mdl_function('core_course_update_courses',
@ -97,7 +103,7 @@ class Moodle:
raise SystemError(response)
return response
def create_user(self, email, username, password, first_name="-", last_name="-"):
def create_user(self, email : str, username : str, password : str, first_name : str="-", last_name : str="-") -> Any:
if len(self.get_user_by("username", username)["users"]):
raise UserExists
try:
@ -115,7 +121,7 @@ class Moodle:
except SystemError as se:
raise SystemError(se.args[0]["message"])
def update_user(self, username, email, first_name, last_name, enabled=True):
def update_user(self, username : str, email : str, first_name : str, last_name : str, enabled : bool=True) -> Any:
user = self.get_user_by("username", username)["users"][0]
if not len(user):
raise UserNotFound
@ -135,15 +141,15 @@ class Moodle:
except SystemError as se:
raise SystemError(se.args[0]["message"])
def delete_user(self, user_id):
def delete_user(self, user_id : str) -> Any:
user = self.call("core_user_delete_users", userids=[user_id])
return user
def delete_users(self, userids):
def delete_users(self, userids : List[str]) -> Any:
user = self.call("core_user_delete_users", userids=userids)
return user
def get_user_by(self, key, value):
def get_user_by(self, key : str, value : str) -> Any:
criteria = [{"key": key, "value": value}]
try:
user = self.call("core_user_get_users", criteria=criteria)
@ -152,7 +158,7 @@ class Moodle:
return user
# {'users': [{'id': 8, 'username': 'asdfw', 'firstname': 'afowie', 'lastname': 'aokjdnfwe', 'fullname': 'afowie aokjdnfwe', 'email': 'awfewe@ads.com', 'department': '', 'firstaccess': 0, 'lastaccess': 0, 'auth': 'manual', 'suspended': False, 'confirmed': True, 'lang': 'ca', 'theme': '', 'timezone': '99', 'mailformat': 1, 'profileimageurlsmall': 'https://moodle.mydomain.duckdns.org/theme/image.php/cbe/core/1630941606/u/f2', 'profileimageurl': 'https://DOMAIN/theme/image.php/cbe/core/1630941606/u/f1'}], 'warnings': []}
def get_users_with_groups_and_roles(self):
def get_users_with_groups_and_roles(self) -> List[Dict[Any, Any]]:
q = """select u.id as id, username, firstname as first, lastname as last, email, json_agg(h.name) as groups, json_agg(r.shortname) as roles
from mdl_user as u
LEFT JOIN mdl_cohort_members AS hm on hm.userid = u.id
@ -179,31 +185,31 @@ class Moodle:
# user['roles']=[]
# return users
def enroll_user_to_course(self, user_id, course_id, role_id=5):
def enroll_user_to_course(self, user_id : str, course_id : str, role_id : int=5) -> Any:
# 5 is student
data = [{"roleid": role_id, "userid": user_id, "courseid": course_id}]
enrolment = self.call("enrol_manual_enrol_users", enrolments=data)
return enrolment
def get_quiz_attempt(self, quiz_id, user_id):
def get_quiz_attempt(self, quiz_id : str, user_id : str) -> Any:
attempts = self.call(
"mod_quiz_get_user_attempts", quizid=quiz_id, userid=user_id
)
return attempts
def get_cohorts(self):
def get_cohorts(self) -> Any:
cohorts = self.call("core_cohort_get_cohorts")
return cohorts
def add_system_cohort(self, name, description="", visible=True):
visible = 1 if visible else 0
def add_system_cohort(self, name : str, description : str ="", visible : bool=True) -> Any:
bit_visible = 1 if visible else 0
data = [
{
"categorytype": {"type": "system", "value": ""},
"name": name,
"idnumber": name,
"description": description,
"visible": visible,
"visible": bit_visible,
}
]
cohort = self.call("core_cohort_create_cohorts", cohorts=data)
@ -214,7 +220,7 @@ class Moodle:
# user = self.call('core_cohort_add_cohort_members', criteria=criteria)
# return user
def add_user_to_cohort(self, userid, cohortid):
def add_user_to_cohort(self, userid : str, cohortid : str) -> Any:
members = [
{
"cohorttype": {"type": "id", "value": cohortid},
@ -224,21 +230,21 @@ class Moodle:
user = self.call("core_cohort_add_cohort_members", members=members)
return user
def delete_user_in_cohort(self, userid, cohortid):
def delete_user_in_cohort(self, userid : str, cohortid : str) -> Any:
members = [{"cohortid": cohortid, "userid": userid}]
user = self.call("core_cohort_delete_cohort_members", members=members)
return user
def get_cohort_members(self, cohort_ids):
def get_cohort_members(self, cohort_ids : str) -> Any:
members = self.call("core_cohort_get_cohort_members", cohortids=cohort_ids)
# [0]['userids']
return members
def delete_cohorts(self, cohortids):
def delete_cohorts(self, cohortids : Iterable[str]) -> Any:
deleted = self.call("core_cohort_delete_cohorts", cohortids=cohortids)
return deleted
def get_user_cohorts(self, user_id):
def get_user_cohorts(self, user_id : str) -> Any:
user_cohorts = []
cohorts = self.get_cohorts()
for cohort in cohorts:
@ -246,7 +252,7 @@ class Moodle:
user_cohorts.append(cohort)
return user_cohorts
def add_user_to_siteadmin(self, user_id):
def add_user_to_siteadmin(self, user_id : str) -> Any:
q = """SELECT value FROM mdl_config WHERE name='siteadmins'"""
value = self.moodle_pg.select(q)[0][0]
if str(user_id) not in value:

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -18,32 +19,29 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import logging as log
import time
import traceback
from datetime import datetime, timedelta
import mysql.connector
import yaml
# from admin import app
from typing import List, Tuple
class Mysql:
def __init__(self, host, database, user, password):
# TODO: Fix this whole class
cur : mysql.connector.MySQLCursor
conn : mysql.connector.MySQLConnection
def __init__(self, host : str, database : str, user : str, password : str) -> None:
self.conn = mysql.connector.connect(
host=host, database=database, user=user, password=password
)
def select(self, sql):
def select(self, sql : str) -> List[Tuple]:
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
data : List[Tuple] = self.cur.fetchall()
self.cur.close()
return data
def update(self, sql):
def update(self, sql : str) -> None:
# TODO: Fix this whole method
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.conn.commit()

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -30,21 +31,31 @@ import urllib
import requests
from psycopg2 import sql
# from ..lib.log import *
from admin import app
from .nextcloud_exc import *
from .postgres import Postgres
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
DDUser = Dict[Any, Any]
class Nextcloud:
verify_cert : bool
apiurl : str
shareurl : str
davurl : str
auth : Tuple[str, str]
user : str
nextcloud_pg : Postgres
def __init__(
self,
url="https://nextcloud." + app.config["DOMAIN"],
username=os.environ["NEXTCLOUD_ADMIN_USER"],
password=os.environ["NEXTCLOUD_ADMIN_PASSWORD"],
verify=True,
):
app : "AdminFlaskApp",
username : str=os.environ["NEXTCLOUD_ADMIN_USER"],
password : str=os.environ["NEXTCLOUD_ADMIN_PASSWORD"],
verify : bool=True,
) -> None:
url = "https://nextcloud." + app.config["DOMAIN"]
self.verify_cert = verify
self.apiurl = url + "/ocs/v1.php/cloud/"
@ -61,9 +72,9 @@ class Nextcloud:
)
def _request(
self, method, url, data={}, headers={"OCS-APIRequest": "true"}, auth=False
):
if auth == False:
self, method : str, url : str, data : Any={}, headers : Dict[str, str]={"OCS-APIRequest": "true"}, auth : Optional[Tuple[str, str]]=None
) -> str:
if auth is None:
auth = self.auth
try:
response = requests.request(
@ -96,7 +107,7 @@ class Nextcloud:
raise ProviderConnError
raise ProviderError
def check_connection(self):
def check_connection(self) -> bool:
url = self.apiurl + "users/" + self.user + "?format=json"
try:
result = self._request("GET", url)
@ -118,7 +129,7 @@ class Nextcloud:
raise ProviderConnError
raise ProviderError
def get_user(self, userid):
def get_user(self, userid : str) -> Any:
url = self.apiurl + "users/" + userid + "?format=json"
try:
result = json.loads(self._request("GET", url))
@ -148,7 +159,7 @@ class Nextcloud:
# users_with_lists = [list(l[:-2])+([[]] if l[-2] == [None] else [list(set(l[-2]))]) + ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users]
# users_with_lists = [list(l[:-2])+([[]] if l[-2] == [None] else [list(set(l[-2]))]) + ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users_with_lists]
# list_dict_users = [dict(zip(fields, r)) for r in users_with_lists]
def get_users_list(self):
def get_users_list(self) -> List[DDUser]:
# q = """select u.uid as username, adn.value as displayname, ade.value as email, json_agg(gg.displayname) as admin_groups,json_agg(g.displayname) as groups
# from oc_users as u
# left join oc_group_user as gu on gu.uid = u.uid
@ -200,9 +211,10 @@ class Nextcloud:
# log.error(traceback.format_exc())
# raise
# TODO: Improve typing of these functions...
def add_user(
self, userid, userpassword, quota=False, group=False, email="", displayname=""
):
self, userid : str, userpassword : str, quota : Any=False, group : Any=False, email : str="", displayname : str=""
) -> bool:
data = {
"userid": userid,
"password": userpassword,
@ -247,7 +259,7 @@ class Nextcloud:
# 106 - no group specified (required for subadmins)
# 107 - all errors that contain a hint - for example “Password is among the 1,000,000 most common ones. Please make it unique.” (this code was added in 12.0.6 & 13.0.1)
def update_user(self, userid, key_values):
def update_user(self, userid : str, key_values : Dict[str, Any]) -> bool:
# key_values={'quota':quota,'email':email,'displayname':displayname}
url = self.apiurl + "users/" + userid + "?format=json"
@ -262,6 +274,8 @@ class Nextcloud:
result = json.loads(
self._request("PUT", url, data=data, headers=headers)
)
# TODO: It seems like this only sets the first item in key_values
# This function probably doesn't do what it is supposed to
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
@ -273,8 +287,9 @@ class Nextcloud:
except:
# log.error(traceback.format_exc())
raise
return False
def add_user_to_group(self, userid, group_id):
def add_user_to_group(self, userid : str, group_id : str) -> bool:
data = {"groupid": group_id}
url = self.apiurl + "users/" + userid + "/groups?format=json"
@ -296,7 +311,7 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def remove_user_from_group(self, userid, group_id):
def remove_user_from_group(self, userid : str, group_id : str) -> bool:
data = {"groupid": group_id}
url = self.apiurl + "users/" + userid + "/groups?format=json"
@ -321,9 +336,10 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
# TODO: Improve typing of these functions...
def add_user_with_groups(
self, userid, userpassword, quota=False, groups=[], email="", displayname=""
):
self, userid : str, userpassword : str, quota : Any=False, groups : Any=[], email : str="", displayname : str=""
) -> bool:
data = {
"userid": userid,
"password": userpassword,
@ -352,7 +368,7 @@ class Nextcloud:
raise ProviderItemExists
if result["ocs"]["meta"]["statuscode"] == 104:
# self.add_group(group)
None
pass
# raise ProviderGroupNotExists
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
@ -368,7 +384,7 @@ class Nextcloud:
# 106 - no group specified (required for subadmins)
# 107 - all errors that contain a hint - for example “Password is among the 1,000,000 most common ones. Please make it unique.” (this code was added in 12.0.6 & 13.0.1)
def delete_user(self, userid):
def delete_user(self, userid : str) -> bool:
url = self.apiurl + "users/" + userid + "?format=json"
try:
result = json.loads(self._request("DELETE", url))
@ -384,13 +400,13 @@ class Nextcloud:
# 100 - successful
# 101 - failure
def enable_user(self, userid):
None
def enable_user(self, userid : str) -> None:
pass
def disable_user(self, userid):
None
def disable_user(self, userid : str) -> None:
pass
def exists_user_folder(self, userid, userpassword, folder="IsardVDI"):
def exists_user_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> bool:
auth = (userid, userpassword)
url = self.davurl + userid + "/" + folder + "?format=json"
headers = {
@ -407,7 +423,7 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def add_user_folder(self, userid, userpassword, folder="IsardVDI"):
def add_user_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> bool:
auth = (userid, userpassword)
url = self.davurl + userid + "/" + folder + "?format=json"
headers = {
@ -429,7 +445,7 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def exists_user_share_folder(self, userid, userpassword, folder="IsardVDI"):
def exists_user_share_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> Dict[str, str]:
auth = (userid, userpassword)
url = self.shareurl + "shares?format=json"
headers = {
@ -449,7 +465,7 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def add_user_share_folder(self, userid, userpassword, folder="IsardVDI"):
def add_user_share_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> Dict[str, str]:
auth = (userid, userpassword)
data = {"path": "/" + folder, "shareType": 3}
url = self.shareurl + "shares?format=json"
@ -477,10 +493,10 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def get_group(self, userid):
None
def get_group(self, userid : str) -> None:
pass
def get_groups_list(self):
def get_groups_list(self) -> List[Any]:
url = self.apiurl + "groups?format=json"
try:
result = json.loads(self._request("GET", url))
@ -491,7 +507,7 @@ class Nextcloud:
# log.error(traceback.format_exc())
raise
def add_group(self, groupid):
def add_group(self, groupid : str) -> bool:
data = {"groupid": groupid}
url = self.apiurl + "groups?format=json"
headers = {
@ -515,7 +531,7 @@ class Nextcloud:
# 102 - group already exists
# 103 - failed to add the group
def delete_group(self, groupid):
def delete_group(self, groupid : str) -> bool:
group = urllib.parse.quote(groupid, safe="")
url = self.apiurl + "groups/" + group + "?format=json"
headers = {
@ -538,7 +554,7 @@ class Nextcloud:
# 102 - group already exists
# 103 - failed to add the group
def set_user_mail(self, data):
def set_user_mail(self, data : DDUser) -> None:
query = """SELECT * FROM "oc_mail_accounts" WHERE "email" = '%s'"""
sql_query = sql.SQL(query.format(data["email"]))
if not len(self.nextcloud_pg.select(sql_query)):

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -18,54 +19,41 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import logging as log
import time
import traceback
from datetime import datetime, timedelta
import psycopg2
import yaml
import psycopg2.sql
from psycopg2.extensions import connection, cursor
# from admin import app
from typing import Any, List, Tuple, Union
query = Union[str, psycopg2.sql.SQL]
class Postgres:
def __init__(self, host, database, user, password):
# TODO: Fix this whole class
cur : cursor
conn : connection
def __init__(self, host : str, database : str, user : str, password : str) -> None:
self.conn = psycopg2.connect(
host=host, database=database, user=user, password=password
)
# def __del__(self):
# self.cur.close()
# self.conn.close()
def select(self, sql):
def select(self, sql: query) -> List[Tuple[Any, ...]]:
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
self.cur.close()
self.cur.close() # type: ignore # psycopg2 type hint missing
return data
def update(self, sql):
def update(self, sql : query) -> None:
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.conn.commit()
self.cur.close()
self.cur.close() # type: ignore # psycopg2 type hint missing
# return self.cur.fetchall()
def select_with_headers(self, sql):
def select_with_headers(self, sql : query) -> Tuple[List[Any], List[Tuple[Any, ...]]]:
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
fields = [a.name for a in self.cur.description]
self.cur.close()
self.cur.close() # type: ignore # psycopg2 type hint missing
return (fields, data)
# def update_moodle_saml_plugin(self):
# plugin[('idpmetadata', '<md:EntitiesDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" Name="urn:keycloak"><md:EntityDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" entityID="https://sso.'+app.config['DOMAIN']+'/auth/realms/master"><md:IDPSSODescriptor WantAuthnRequestsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><md:KeyDescriptor use="signing"><ds:KeyInfo><ds:KeyName>NrtA5ynG0htowP3SXw7dBJRIAMxn-1PwuuXwOwNhlRw</ds:KeyName><ds:X509Data><ds:X509Certificate>MIICmzCCAYMCBgF5jb0RCTANBgkqhkiG9w0BAQsFADARMQ8wDQYDVQQDDAZtYXN0ZXIwHhcNMjEwNTIxMDcwMjI4WhcNMzEwNTIxMDcwNDA4WjARMQ8wDQYDVQQDDAZtYXN0ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCI8xh/C0+frz3kgWiUbziTDls71R2YiXLSVE+bw7gbEgZUGCLhoEI679azMtIxmnzM/snIX+yTb12+XoYkgbiLTMPQfnH+Kiab6g3HL3KPfhqS+yWkFxOoCp6Ibmp7yPlVWuHH+MBfO8OBr/r8Ao7heFbuzjiLd1KG67rcoaxfDgMuBoEomg1bgEjFgHaQIrSC6OZzH0h987/arqufZXeXlfyiqScMPUi+u5IpDWSwz06UKP0k8mxzNSlpZ93CKOUSsV0SMLxqg7FQ3SGiOk577bGW9o9BDTkkmSo3Up6smc0LzwvvUwuNd0B1irGkWZFQN9OXJnJYf1InEebIMtmPAgMBAAEwDQYJKoZIhvcNAQELBQADggEBADM34+qEGeBQ22luphVTuVJtGxcbxLx7DfsT0QfJD/OuxTTbNAa1VRyarb5juIAkqdj4y2quZna9ZXLecVo4RkwpzPoKoAkYA8b+kHnWqEwJi9iPrDvKb+GR0bBkLPN49YxIZ8IdKX/PRa3yuLHe+loiNsCaS/2ZK2KO46COsqU4QX1iVhF9kWphNLybjNAX45B6cJLsa1g0vXLdm3kv3SB4I2fErFVaOoDtFIjttoYlXdpUiThkPXBfr7N67P3dZHaS4tjJh+IZ8I6TINpcsH8dBkUhzYEIPHCePwSiC1w6WDBLNDuKt1mj1CZrLq+1x+Yhrs+QNRheEKGi89HZ8N0=</ds:X509Certificate></ds:X509Data></ds:KeyInfo></md:KeyDescriptor><md:ArtifactResolutionService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml/resolve" index="0"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:persistent</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress</md:NameIDFormat><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/></md:IDPSSODescriptor></md:EntityDescriptor></md:EntitiesDescriptor>')]
# pg_update = """UPDATE mdl_config_plugins set title = %s where plugin = auth_saml2 and name ="""
# cursor.execute(pg_update, (title, bookid))
# connection.commit()
# count = cursor.rowcount
# print(count, "Successfully Updated!")

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -23,8 +24,6 @@ import logging as log
import os
import random
# from .keycloak import Keycloak
# from .moodle import Moodle
import string
import time
import traceback
@ -33,13 +32,16 @@ from datetime import datetime, timedelta
import psycopg2
import yaml
from admin import app
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from .postgres import Postgres
class Postup:
def __init__(self):
def __init__(self, app: "AdminFlaskApp") -> None:
ready = False
while not ready:
try:
@ -93,9 +95,9 @@ class Postup:
self.select_and_configure_theme()
self.configure_tipnc()
self.add_moodle_ws_token()
self.add_moodle_ws_token(app)
def select_and_configure_theme(self, theme="cbe"):
def select_and_configure_theme(self, theme : str="cbe") -> None:
try:
self.pg.update(
"""UPDATE "mdl_config" SET value = '%s' WHERE "name" = 'theme';"""
@ -104,7 +106,6 @@ class Postup:
except:
log.error(traceback.format_exc())
exit(1)
None
try:
self.pg.update(
@ -127,9 +128,8 @@ class Postup:
except:
log.error(traceback.format_exc())
exit(1)
None
def configure_tipnc(self):
def configure_tipnc(self) -> None:
try:
self.pg.update(
"""UPDATE "mdl_config_plugins" SET value = '%s' WHERE "plugin" = 'assignsubmission_tipnc' AND "name" = 'host';"""
@ -155,9 +155,8 @@ class Postup:
except:
log.error(traceback.format_exc())
exit(1)
None
def add_moodle_ws_token(self):
def add_moodle_ws_token(self, app: "AdminFlaskApp") -> None:
try:
token = self.pg.select(
"""SELECT * FROM "mdl_external_tokens" WHERE "externalserviceid" = 3"""
@ -166,7 +165,7 @@ class Postup:
return
except:
# log.error(traceback.format_exc())
None
pass
try:
self.pg.update(
@ -225,4 +224,3 @@ class Postup:
except:
log.error(traceback.format_exc())
exit(1)
None

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -19,6 +20,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import logging as log
from operator import itemgetter
import os
import socket
import sys
@ -27,302 +29,307 @@ import traceback
from flask import request
from admin import app
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from ..lib.api_exceptions import Error
from .decorators import has_token
from .decorators import has_token, OptionalJsonResponse
## LISTS
@app.route("/ddapi/users", methods=["GET"])
@has_token
def ddapi_users():
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
def setup_api_views(app : "AdminFlaskApp") -> None:
## LISTS
@app.json_route("/ddapi/users", methods=["GET"])
@has_token
def ddapi_users() -> OptionalJsonResponse:
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/ddapi/users/filter", methods=["POST"])
@has_token
def ddapi_users_search() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
sorted_result = sorted(result, key=itemgetter("id"))
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
return None
@app.route("/ddapi/users/filter", methods=["POST"])
@has_token
def ddapi_users_search():
if request.method == "POST":
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
sorted_result = sorted(result, key=lambda k: k["id"])
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
@app.json_route("/ddapi/groups", methods=["GET"])
@has_token
def ddapi_groups() -> OptionalJsonResponse:
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=itemgetter("name"))
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
return None
@app.route("/ddapi/groups", methods=["GET"])
@has_token
def ddapi_groups():
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: k["name"])
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/group/users", methods=["POST"])
@has_token
def ddapi_group_users():
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
if data.get("id"):
group_users = [
user_parser(user)
for user in sorted_users
if data.get("id") in user["keycloak_groups"]
]
elif data.get("path"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["path"] == data.get("path")
][0]
@app.json_route("/ddapi/group/users", methods=["POST"])
@has_token
def ddapi_group_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
if data.get("id"):
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
if data.get("id") in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group path not found in system")
elif data.get("keycloak_id"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["id"] == data.get("keycloak_id")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(group_users), 200, {"Content-Type": "application/json"}
elif data.get("path"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["path"] == data.get("path")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group path not found in system")
elif data.get("keycloak_id"):
try:
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["id"] == data.get("keycloak_id")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
raise Error("not_found", "Group keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(group_users), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/ddapi/roles", methods=["GET"])
@has_token
def ddapi_roles() -> OptionalJsonResponse:
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=itemgetter("name")):
log.error(role)
roles.append(
{
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
)
return json.dumps(roles), 200, {"Content-Type": "application/json"}
return None
@app.route("/ddapi/roles", methods=["GET"])
@has_token
def ddapi_roles():
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=lambda k: k["name"]):
log.error(role)
roles.append(
{
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
)
return json.dumps(roles), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/role/users", methods=["POST"])
@has_token
def ddapi_role_users():
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
if data.get("id", data.get("name")):
role_users = [
user_parser(user)
for user in sorted_users
if data.get("id", data.get("name")) in user["roles"]
]
elif data.get("keycloak_id"):
try:
id = [
r["id"]
for r in app.admin.get_roles()
if r["id"] == data.get("keycloak_id")
][0]
@app.json_route("/ddapi/role/users", methods=["POST"])
@has_token
def ddapi_role_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
if data.get("id", data.get("name")):
role_users = [
user_parser(user) for user in sorted_users if id in user["roles"]
user_parser(user)
for user in sorted_users
if data.get("id", data.get("name")) in user["roles"]
]
except:
raise Error("not_found", "Role keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
elif data.get("keycloak_id"):
try:
id = [
r["id"]
for r in app.admin.get_roles()
if r["id"] == data.get("keycloak_id")
][0]
role_users = [
user_parser(user) for user in sorted_users if id in user["roles"]
]
except:
raise Error("not_found", "Role keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
return None
## INDIVIDUAL ACTIONS
@app.route("/ddapi/user", methods=["POST"])
@app.route("/ddapi/user/<user_ddid>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_user(user_ddid=None):
if request.method == "GET":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
return json.dumps(user_parser(user)), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: ",
+str(app.validators["user"].errors),
traceback.format_exc(),
)
if app.admin.get_user_username(data["username"]):
raise Error("conflict", "User id already exists")
data = app.validators["user"].normalized(data)
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
raise Error(
"precondition_required",
"Not all user groups already in system. Please create user groups before adding user.",
)
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
data = request.get_json(force=True)
if not app.validators["user_update"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: "
+ str(app.validators["user_update"].errors),
traceback.format_exc(),
)
data = {**user, **data}
data = app.validators["user_update"].normalized(data)
data = {**data, **{"username": user_ddid}}
data["roles"] = [data.pop("role")]
data["firstname"] = data.pop("first")
data["lastname"] = data.pop("last")
app.admin.user_update(data)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/username/<old_user_ddid>/<new_user_did>", methods=["PUT"])
@has_token
def ddapi_username(old_user_ddid, new_user_did):
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return json.dumps("Not implemented yet!"), 419, {"Content-Type": "application/json"}
@app.route("/ddapi/group", methods=["POST"])
@app.route("/ddapi/group/<id>", methods=["GET", "POST", "DELETE"])
# @app.route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_group(id=None):
if request.method == "GET":
group = app.admin.get_group_by_name(id)
if not group:
Error("not found", "Group id not found")
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
raise Error(
"bad_request",
"Data validation for group failed: "
+ str(app.validators["group"].errors),
traceback.format_exc(),
)
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(id):
raise Error("conflict", "Group id already exists")
path = app.admin.add_group(data)
# log.error(path)
# keycloak_id = app.admin.get_group_by_name(id)["id"]
# log.error()
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(id)
if not group:
raise Error("not_found", "Group id not found")
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/user_mail", methods=["POST"])
@app.route("/ddapi/user_mail/<id>", methods=["GET", "DELETE"])
@has_token
def ddapi_user_mail(id=None):
if request.method == "GET":
return (
json.dumps("Not implemented yet"),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
# if not app.validators["mails"].validate(data):
# raise Error(
# "bad_request",
# "Data validation for mail failed: "
# + str(app.validators["mail"].errors),
# traceback.format_exc(),
# )
for user in data:
if not app.validators["mail"].validate(user):
## INDIVIDUAL ACTIONS
@app.json_route("/ddapi/user", methods=["POST"])
@app.json_route("/ddapi/user/<user_ddid>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_user(user_ddid : Optional[str]=None) -> OptionalJsonResponse:
uid : str = user_ddid if user_ddid else ''
if request.method == "GET":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
return json.dumps(user_parser(user)), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
raise Error(
"bad_request",
"Data validation for mail failed: "
+ str(app.validators["mail"].errors),
"Data validation for user failed: ",
+str(app.validators["user"].errors),
traceback.format_exc(),
)
for user in data:
log.info("Added user email")
app.admin.set_nextcloud_user_mail(user)
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
if app.admin.get_user_username(data["username"]):
raise Error("conflict", "User id already exists")
data = app.validators["user"].normalized(data)
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
raise Error(
"precondition_required",
"Not all user groups already in system. Please create user groups before adding user.",
)
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
def user_parser(user):
if request.method == "PUT":
user = app.admin.get_user_username(uid)
if not user:
raise Error("not_found", "User id not found")
data = request.get_json(force=True)
if not app.validators["user_update"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: "
+ str(app.validators["user_update"].errors),
traceback.format_exc(),
)
data = {**user, **data}
data = app.validators["user_update"].normalized(data)
data = {**data, **{"username": uid}}
data["roles"] = [data.pop("role")]
data["firstname"] = data.pop("first")
data["lastname"] = data.pop("last")
app.admin.user_update(data)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/ddapi/username/<old_user_ddid>/<new_user_did>", methods=["PUT"])
@has_token
def ddapi_username(old_user_ddid : str, new_user_did : str) -> OptionalJsonResponse:
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return json.dumps("Not implemented yet!"), 419, {"Content-Type": "application/json"}
@app.json_route("/ddapi/group", methods=["POST"])
@app.json_route("/ddapi/group/<group_id>", methods=["GET", "POST", "DELETE"])
# @app.json_route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_group(group_id : Optional[str]=None) -> OptionalJsonResponse:
uid : str = group_id if group_id else ''
if request.method == "GET":
group = app.admin.get_group_by_name(uid)
if not group:
Error("not found", "Group id not found")
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
raise Error(
"bad_request",
"Data validation for group failed: "
+ str(app.validators["group"].errors),
traceback.format_exc(),
)
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(uid):
raise Error("conflict", "Group id already exists")
path = app.admin.add_group(data)
# log.error(path)
# keycloak_id = app.admin.get_group_by_name(id)["id"]
# log.error()
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(uid)
if not group:
raise Error("not_found", "Group id not found")
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/ddapi/user_mail", methods=["POST"])
@app.json_route("/ddapi/user_mail/<id>", methods=["GET", "DELETE"])
@has_token
def ddapi_user_mail(id : Optional[str]=None) -> OptionalJsonResponse:
if request.method == "GET":
return (
json.dumps("Not implemented yet"),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
# if not app.validators["mails"].validate(data):
# raise Error(
# "bad_request",
# "Data validation for mail failed: "
# + str(app.validators["mail"].errors),
# traceback.format_exc(),
# )
for user in data:
if not app.validators["mail"].validate(user):
raise Error(
"bad_request",
"Data validation for mail failed: "
+ str(app.validators["mail"].errors),
traceback.format_exc(),
)
for user in data:
log.info("Added user email")
app.admin.set_nextcloud_user_mail(user)
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
return None
# TODO: After this line, this is all mostly duplicated from other places...
def user_parser(user : Dict[str, Any]) -> Dict[str, Any]:
return {
"keycloak_id": user["id"],
"id": user["username"],
@ -338,7 +345,7 @@ def user_parser(user):
}
def group_parser(group):
def group_parser(group : Dict[str, str]) -> Dict[str, Any]:
return {
"keycloak_id": group["id"],
"id": group["name"],
@ -348,7 +355,7 @@ def group_parser(group):
}
def filter_users(users, text):
def filter_users(users : Iterable[Dict[str, Any]], text : str) -> List[Dict[str, Any]]:
return [
user
for user in users

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -21,41 +22,44 @@ import os
from flask import flash, redirect, render_template, request, url_for
from flask_login import current_user, login_required, login_user, logout_user
from werkzeug.wrappers import Response
from admin import app
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from ..auth.authentication import *
@app.route("/", methods=["GET", "POST"])
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
if request.form["user"] == "" or request.form["password"] == "":
flash("Can't leave it blank", "danger")
elif request.form["user"].startswith(" "):
flash("Username not found or incorrect password.", "warning")
else:
ram_user = ram_users.get(request.form["user"])
if ram_user and request.form["password"] == ram_user["password"]:
user = User(
{
"id": ram_user["id"],
"password": ram_user["password"],
"role": ram_user["role"],
"active": True,
}
)
login_user(user)
flash("Logged in successfully.", "success")
return redirect(url_for("web_users"))
else:
def setup_login_views(app : "AdminFlaskApp") -> None:
@app.route("/", methods=["GET", "POST"])
@app.route("/login", methods=["GET", "POST"])
def login() -> Response:
if request.method == "POST":
if request.form["user"] == "" or request.form["password"] == "":
flash("Can't leave it blank", "danger")
elif request.form["user"].startswith(" "):
flash("Username not found or incorrect password.", "warning")
return render_template("login.html")
else:
ram_user = ram_users.get(request.form["user"])
if ram_user and request.form["password"] == ram_user["password"]:
user = User(
id = ram_user["id"],
password = ram_user["password"],
role = ram_user["role"],
active = True,
)
login_user(user)
flash("Logged in successfully.", "success")
return redirect(url_for("web_users"))
else:
flash("Username not found or incorrect password.", "warning")
o : Response = app.make_response(render_template("login.html"))
return o
@app.route("/logout", methods=["GET"])
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
@app.route("/logout", methods=["GET"])
@login_required
def logout() -> Response:
logout_user()
return redirect(url_for("login"))

View File

@ -1,39 +0,0 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
#
# This file is part of DD
#
# DD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# DD is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
# from flask_socketio import SocketIO, emit, join_room, leave_room, \
# close_room, rooms, disconnect, send
# from admin import app
# import json
# # socketio = SocketIO(app)
# # from ...start import socketio
# @socketio.on('connect', namespace='//sio')
# def socketio_connect():
# join_room('admin')
# socketio.emit('update',
# json.dumps('Joined'),
# namespace='//sio',
# room='admin')
# @socketio.on('disconnect', namespace='//sio')
# def socketio_domains_disconnect():
# None

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -34,107 +35,110 @@ from flask import (
jsonify,
redirect,
request,
Response,
send_file,
url_for,
)
from flask import render_template as render_template_flask
from flask_login import login_required
from admin import app
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from ..lib.avatars import Avatars
from .decorators import is_admin
avatars = Avatars()
from ..lib.legal import gen_legal_if_not_exists
def render_template(*args, **kwargs):
def render_template(*args : str, **kwargs : str) -> str:
kwargs["DOMAIN"] = os.environ["DOMAIN"]
return render_template_flask(*args, **kwargs)
@app.route("/users")
@login_required
def web_users():
return render_template("pages/users.html", title="Users", nav="Users")
def setup_web_views(app : "AdminFlaskApp") -> None:
@app.route("/users")
@login_required
def web_users() -> str:
return render_template("pages/users.html", title="Users", nav="Users")
@app.route("/roles")
@login_required
def web_roles():
return render_template("pages/roles.html", title="Roles", nav="Roles")
@app.route("/roles")
@login_required
def web_roles() -> str:
return render_template("pages/roles.html", title="Roles", nav="Roles")
@app.route("/groups")
@login_required
def web_groups(provider=False):
return render_template("pages/groups.html", title="Groups", nav="Groups")
@app.route("/groups")
@login_required
def web_groups(provider : bool=False) -> str:
return render_template("pages/groups.html", title="Groups", nav="Groups")
@app.route("/avatar/<userid>", methods=["GET"])
@login_required
def avatar(userid):
if userid != "false":
return send_file("../avatars/master-avatars/" + userid, mimetype="image/jpeg")
return send_file("static/img/missing.jpg", mimetype="image/jpeg")
@app.route("/avatar/<userid>", methods=["GET"])
@login_required
def avatar(userid : str) -> Response:
if userid != "false":
return send_file("../avatars/master-avatars/" + userid, mimetype="image/jpeg")
return send_file("static/img/missing.jpg", mimetype="image/jpeg")
@app.route("/dashboard")
@login_required
def dashboard(provider=False):
data = json.loads(requests.get("http://dd-sso-api/json").text)
return render_template(
"pages/dashboard.html", title="Customization", nav="Customization", data=data
)
@app.route("/dashboard")
@login_required
def dashboard(provider : bool=False) -> str:
data = json.loads(requests.get("http://dd-sso-api/json").text)
return render_template(
"pages/dashboard.html", title="Customization", nav="Customization", data=data
)
@app.route("/legal")
@login_required
def legal():
# data = json.loads(requests.get("http://dd-sso-api/json").text)
return render_template("pages/legal.html", title="Legal", nav="Legal", data={})
@app.route("/legal")
@login_required
def legal() -> str:
# data = json.loads(requests.get("http://dd-sso-api/json").text)
return render_template("pages/legal.html", title="Legal", nav="Legal", data="")
@app.route("/legal_text")
def legal_text():
lang = request.args.get("lang")
if not lang or lang not in ["ca","es","en","fr"]:
lang="ca"
gen_legal_if_not_exists(lang)
return render_template("pages/legal/"+lang)
@app.route("/legal_text")
def legal_text() -> str:
lang = request.args.get("lang")
if not lang or lang not in ["ca","es","en","fr"]:
lang="ca"
gen_legal_if_not_exists(app, lang)
return render_template("pages/legal/"+lang)
### SYS ADMIN
### SYS ADMIN
@app.route("/sysadmin/users")
@login_required
@is_admin
def web_sysadmin_users():
return render_template(
"pages/sysadmin/users.html", title="SysAdmin Users", nav="SysAdminUsers"
)
@app.route("/sysadmin/users")
@login_required
@is_admin
def web_sysadmin_users() -> Response:
o : Response = app.make_response(render_template(
"pages/sysadmin/users.html", title="SysAdmin Users", nav="SysAdminUsers"
))
return o
@app.route("/sysadmin/groups")
@login_required
@is_admin
def web_sysadmin_groups():
return render_template(
"pages/sysadmin/groups.html", title="SysAdmin Groups", nav="SysAdminGroups"
)
@app.route("/sysadmin/groups")
@login_required
@is_admin
def web_sysadmin_groups() -> Response:
o : Response = app.make_response(render_template(
"pages/sysadmin/groups.html", title="SysAdmin Groups", nav="SysAdminGroups"
))
return o
@app.route("/sysadmin/external")
@login_required
## SysAdmin role
def web_sysadmin_external():
return render_template(
"pages/sysadmin/external.html", title="External", nav="External"
)
@app.route("/sysadmin/external")
@login_required
## SysAdmin role
def web_sysadmin_external() -> str:
return render_template(
"pages/sysadmin/external.html", title="External", nav="External"
)
@app.route("/sockettest")
def web_sockettest():
return render_template(
"pages/sockettest.html", title="Sockettest Users", nav="SysAdminUsers"
)
@app.route("/sockettest")
def web_sockettest() -> str:
return render_template(
"pages/sockettest.html", title="Sockettest Users", nav="SysAdminUsers"
)

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -20,6 +21,7 @@
import json
import logging as log
from operator import itemgetter
import os
import socket
import sys
@ -28,113 +30,117 @@ import traceback
from flask import request
from admin import app
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
from .decorators import is_internal
from admin.views.decorators import OptionalJsonResponse, is_internal
@app.route("/api/internal/users", methods=["GET"])
@is_internal
def internal_users():
log.error(socket.gethostbyname("dd-apps-wordpress"))
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if not user.get("enabled"):
continue
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
def setup_wp_views(app : "AdminFlaskApp") -> None:
@app.json_route("/api/internal/users", methods=["GET"])
@is_internal
def internal_users() -> OptionalJsonResponse:
log.error(socket.gethostbyname("dd-apps-wordpress"))
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if not user.get("enabled"):
continue
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
return None
@app.route("/api/internal/users/filter", methods=["POST"])
@is_internal
def internal_users_search():
if request.method == "POST":
data = request.get_json(force=True)
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
sorted_result = sorted(result, key=lambda k: k["id"])
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
@app.route("/api/internal/groups", methods=["GET"])
@is_internal
def internal_groups():
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: k["name"])
groups = []
for group in sorted_groups:
if not group["path"].startswith("/"):
continue
groups.append(
{
"id": group["path"],
"name": group["name"],
"description": group.get("description", ""),
}
)
return json.dumps(groups), 200, {"Content-Type": "application/json"}
@app.route("/api/internal/group/users", methods=["POST"])
@is_internal
def internal_group_users():
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if data["path"] not in user["keycloak_groups"] or not user.get("enabled"):
continue
users.append(user)
if data.get("text", False) and data["text"] != "":
@app.json_route("/api/internal/users/filter", methods=["POST"])
@is_internal
def internal_users_search() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
else:
result = [user_parser(user) for user in users]
return json.dumps(result), 200, {"Content-Type": "application/json"}
sorted_result = sorted(result, key=itemgetter("id"))
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/api/internal/groups", methods=["GET"])
@is_internal
def internal_groups() -> OptionalJsonResponse:
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=itemgetter("name"))
groups = []
for group in sorted_groups:
if not group["path"].startswith("/"):
continue
groups.append(
{
"id": group["path"],
"name": group["name"],
"description": group.get("description", ""),
}
)
return json.dumps(groups), 200, {"Content-Type": "application/json"}
return None
@app.route("/api/internal/roles", methods=["GET"])
@is_internal
def internal_roles():
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=lambda k: k["name"]):
if role["name"] == "admin":
continue
roles.append(
{
"id": role["id"],
"name": role["name"],
"description": role.get("description", ""),
}
)
return json.dumps(roles), 200, {"Content-Type": "application/json"}
@app.json_route("/api/internal/group/users", methods=["POST"])
@is_internal
def internal_group_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if data["path"] not in user["keycloak_groups"] or not user.get("enabled"):
continue
users.append(user)
if data.get("text", False) and data["text"] != "":
result = [user_parser(user) for user in filter_users(users, data["text"])]
else:
result = [user_parser(user) for user in users]
return json.dumps(result), 200, {"Content-Type": "application/json"}
return None
@app.json_route("/api/internal/roles", methods=["GET"])
@is_internal
def internal_roles() -> OptionalJsonResponse:
if request.method == "GET":
roles = []
for role in sorted(app.admin.get_roles(), key=itemgetter("name")):
if role["name"] == "admin":
continue
roles.append(
{
"id": role["id"],
"name": role["name"],
"description": role.get("description", ""),
}
)
return json.dumps(roles), 200, {"Content-Type": "application/json"}
return None
@app.route("/api/internal/role/users", methods=["POST"])
@is_internal
def internal_role_users():
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if data["role"] not in user["roles"] or not user.get("enabled"):
continue
users.append(user)
if data.get("text", False) and data["text"] != "":
result = [user_parser(user) for user in filter_users(users, data["text"])]
else:
result = [user_parser(user) for user in users]
return json.dumps(result), 200, {"Content-Type": "application/json"}
@app.json_route("/api/internal/role/users", methods=["POST"])
@is_internal
def internal_role_users() -> OptionalJsonResponse:
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=itemgetter("username"))
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
users = []
for user in sorted_users:
if data["role"] not in user["roles"] or not user.get("enabled"):
continue
users.append(user)
if data.get("text", False) and data["text"] != "":
result = [user_parser(user) for user in filter_users(users, data["text"])]
else:
result = [user_parser(user) for user in users]
return json.dumps(result), 200, {"Content-Type": "application/json"}
return None
def user_parser(user):
def user_parser(user : Dict[str, Any]) -> Dict[str, Any]:
return {
"id": user["username"],
"first": user["first"],
@ -145,7 +151,7 @@ def user_parser(user):
}
def filter_users(users, text):
def filter_users(users : Iterable[Dict[str, Any]], text : str) -> List[Dict[str, Any]]:
return [
user
for user in users

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -25,25 +26,28 @@ import socket
from functools import wraps
from flask import redirect, request, url_for
from werkzeug.wrappers import Response
from flask_login import current_user, logout_user
from jose import jwt
from ..auth.tokens import get_header_jwt_payload
from typing import Any, Callable, Dict, Optional, Tuple
JsonResponse = Tuple[str, int, Dict[str, str]]
OptionalJsonResponse = Optional[JsonResponse]
def is_admin(fn):
def is_admin(fn : Callable[..., Response]) -> Callable[..., Response]:
@wraps(fn)
def decorated_view(*args, **kwargs):
def decorated_view(*args : Any, **kwargs : Any) -> Response:
if current_user.role == "admin":
return fn(*args, **kwargs)
return redirect(url_for("login"))
return decorated_view
def is_internal(fn):
def is_internal(fn : Callable[..., OptionalJsonResponse]) -> Callable[..., OptionalJsonResponse]:
@wraps(fn)
def decorated_view(*args, **kwargs):
def decorated_view(*args : Any, **kwargs : Any) -> OptionalJsonResponse:
remote_addr = (
request.headers["X-Forwarded-For"].split(",")[0]
if "X-Forwarded-For" in request.headers
@ -67,18 +71,18 @@ def is_internal(fn):
return decorated_view
def has_token(fn):
def has_token(fn : Callable[..., Any]) -> Callable[..., Any]:
@wraps(fn)
def decorated(*args, **kwargs):
def decorated(*args : Any, **kwargs : Any) -> Any:
payload = get_header_jwt_payload()
return fn(*args, **kwargs)
return decorated
def is_internal_or_has_token(fn):
def is_internal_or_has_token(fn : Callable[..., Any]) -> Callable[..., Any]:
@wraps(fn)
def decorated_view(*args, **kwargs):
def decorated_view(*args : Any, **kwargs : Any) -> Any:
remote_addr = (
request.headers["X-Forwarded-For"].split(",")[0]
if "X-Forwarded-For" in request.headers
@ -94,9 +98,9 @@ def is_internal_or_has_token(fn):
return decorated_view
def login_or_token(fn):
def login_or_token(fn : Callable[..., Any]) -> Callable[..., Any]:
@wraps(fn)
def decorated_view(*args, **kwargs):
def decorated_view(*args : Any, **kwargs : Any) -> Any:
if current_user.is_authenticated:
return fn(*args, **kwargs)
payload = get_header_jwt_payload()

View File

@ -1,5 +1,6 @@
#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
@ -39,14 +40,17 @@ from flask_socketio import (
send,
)
from admin import app
from admin import get_app
# Set up the app
app = get_app()
app.setup()
app.socketio = SocketIO(app)
@app.socketio.on("connect", namespace="/sio")
@login_required
def socketio_connect():
def socketio_connect() -> None:
if current_user.id:
join_room("admin")
app.socketio.emit(
@ -57,12 +61,12 @@ def socketio_connect():
@app.socketio.on("disconnect", namespace="/sio")
def socketio_disconnect():
def socketio_disconnect() -> None:
leave_room("admin")
@app.socketio.on("connect", namespace="/sio/events")
def socketio_connect():
def socketio_connect() -> None:
jwt = get_token_payload(request.args.get("jwt"))
join_room("events")
@ -75,7 +79,7 @@ def socketio_connect():
@app.socketio.on("disconnect", namespace="/sio/events")
def socketio_events_disconnect():
def socketio_events_disconnect() -> None:
leave_room("events")

View File

@ -43,9 +43,11 @@ services:
- ${DATA_FOLDER}/moodle/saml2:/admin/moodledata/saml2:rw
- ${DATA_FOLDER}/saml_certs:/admin/saml_certs:rw
- ${DATA_FOLDER}/legal:/admin/admin/static/templates/pages/legal:rw
- ${DATA_FOLDER}/dd-admin:/data:rw
env_file:
- .env
environment:
- VERIFY="false" # In development do not verify certificates
- DOMAIN=${DOMAIN}
- MANAGED_EMAIL_DOMAIN=${MANAGED_EMAIL_DOMAIN}
- SECRETS=/data/secret