Merge branch 'legal-fix' into 'master'

fix(admin): legal pages editing in admin

See merge request isard/isard-sso!84
Josep Maria Viñolas Auquer 2022-06-02 06:33:11 +00:00
commit b03cdd162c
5 changed files with 599 additions and 76 deletions

View File

@ -1,82 +1,21 @@
import logging as log
import os
from pprint import pprint
from minio import Minio
from minio.commonconfig import REPLACE, CopySource
from minio.deleteobjects import DeleteObject
from requests import get, post
import traceback
from admin import app
legal_path= os.path.join(app.root_path, "static/templates/pages/legal/")
class Avatars:
def __init__(self):
self.mclient = Minio(
"isard-sso-avatars:9000",
access_key="AKIAIOSFODNN7EXAMPLE",
secret_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
secure=False,
)
self.bucket = "master-avatars"
self._minio_set_realm()
# self.update_missing_avatars()
def get_legal(lang):
with open(legal_path+lang, "r") as languagefile:
return languagefile.read()
def add_user_default_avatar(self, userid, role="unknown"):
self.mclient.fput_object(
self.bucket,
userid,
os.path.join(app.root_path, "../custom/avatars/" + role + ".jpg"),
content_type="image/jpeg ",
)
log.warning(
" AVATARS: Updated avatar for user " + userid + " with role " + role
)
def gen_legal_if_not_exists(lang):
if not os.path.isfile(legal_path+lang):
log.debug("Creating new language file")
with open(legal_path+lang, "w") as languagefile:
languagefile.write("<b>Legal</b><br>This is the default legal page for language " + lang)
def delete_user_avatar(self, userid):
self.minio_delete_object(userid)
def update_missing_avatars(self, users):
sys_roles = ["admin", "manager", "teacher", "student"]
for u in self.get_users_without_image(users):
try:
img = [r + ".jpg" for r in sys_roles if r in u["roles"]][0]
except:
img = "unknown.jpg"
self.mclient.fput_object(
self.bucket,
u["id"],
os.path.join(app.root_path, "../custom/avatars/" + img),
content_type="image/jpeg ",
)
log.warning(
" AVATARS: Updated avatar for user "
+ u["username"]
+ " with role "
+ img.split(".")[0]
)
def _minio_set_realm(self):
if not self.mclient.bucket_exists(self.bucket):
self.mclient.make_bucket(self.bucket)
def minio_get_objects(self):
return [o.object_name for o in self.mclient.list_objects(self.bucket)]
def minio_delete_all_objects(self):
delete_object_list = map(
lambda x: DeleteObject(x.object_name),
self.mclient.list_objects(self.bucket),
)
errors = self.mclient.remove_objects(self.bucket, delete_object_list)
for error in errors:
log.error(" AVATARS: Error occured when deleting avatar object: " + error)
def minio_delete_object(self, oid):
errors = self.mclient.remove_objects(self.bucket, [DeleteObject(oid)])
for error in errors:
log.error(" AVATARS: Error occured when deleting avatar object: " + error)
def get_users_without_image(self, users):
return [u for u in users if u["id"] and u["id"] not in self.minio_get_objects()]
def new_legal(lang,html):
with open(legal_path+lang, "w") as languagefile:
languagefile.write(html)

View File

@ -18,14 +18,25 @@ $(document).ready(function () {
// })
$("#save-legal").click(function () {
console.log($('#editor-legal').cleanHtml())
console.log($('#legal-lang').val())
$.ajax({
type: "POST",
url: "/api/legal/legal",
data: {
data: JSON.stringify({
'html': $('#editor-legal').cleanHtml(),
'lang': $('#legal-lang').val()
},
}),
success: function () {
new PNotify({
title: "Legal text",
text: "Updated for "+$('#legal-lang').val()+" language",
hide: true,
delay: 3000,
icon: 'fa fa-alert-sign',
opacity: 1,
type: 'info'
});
},
});
});

View File

@ -0,0 +1,563 @@
#!flask/bin/python
# coding=utf-8
import concurrent.futures
import json
import logging as log
import os
import re
import sys
# import Queue
import threading
import time
import traceback
from uuid import uuid4
from flask import Response, jsonify, redirect, render_template, request, url_for
from flask_login import current_user, login_required
from admin import app
from ..lib.helpers import system_group
from .decorators import login_or_token
threads = {"external": None}
# q = Queue.Queue()
from keycloak.exceptions import KeycloakGetError
from ..lib.dashboard import Dashboard
from ..lib.exceptions import UserExists, UserNotFound
dashboard = Dashboard()
from ..lib.legal import get_legal, gen_legal_if_not_exists, new_legal
@app.route("/sysadmin/api/resync")
@app.route("/api/resync")
@login_required
def resync():
return (
json.dumps(app.admin.resync_data()),
200,
{"Content-Type": "application/json"},
)
@app.route("/api/users", methods=["GET", "PUT"])
@app.route("/api/users/<provider>", methods=["POST", "PUT", "GET", "DELETE"])
@login_or_token
def users(provider=False):
if request.method == "DELETE":
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
if provider == "keycloak":
return (
json.dumps(app.admin.delete_keycloak_users()),
200,
{"Content-Type": "application/json"},
)
if provider == "nextcloud":
return (
json.dumps(app.admin.delete_nextcloud_users()),
200,
{"Content-Type": "application/json"},
)
if provider == "moodle":
return (
json.dumps(app.admin.delete_moodle_users()),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
if provider == "moodle":
return (
json.dumps(app.admin.sync_to_moodle()),
200,
{"Content-Type": "application/json"},
)
if provider == "nextcloud":
return (
json.dumps(app.admin.sync_to_nextcloud()),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT" and not provider:
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already working with users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.update_users_from_keycloak, args=()
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Add user error."}),
500,
{"Content-Type": "application/json"},
)
# return json.dumps(app.admin.update_users_from_keycloak()), 200, {'Content-Type': 'application/json'}
users = app.admin.get_mix_users()
if current_user.role != "admin":
for user in users:
user["keycloak_groups"] = [
g for g in user["keycloak_groups"] if not system_group(g)
]
return json.dumps(users), 200, {"Content-Type": "application/json"}
@app.route("/api/users_bulk/<action>", methods=["PUT"])
@login_required
def users_bulk(action):
data = request.get_json(force=True)
if request.method == "PUT":
if action == "enable":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.enable_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Enable users error."}),
500,
{"Content-Type": "application/json"},
)
if action == "disable":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.disable_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Disabling users error."}),
500,
{"Content-Type": "application/json"},
)
if action == "delete":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.delete_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Deleting users error."}),
500,
{"Content-Type": "application/json"},
)
return json.dumps({}), 405, {"Content-Type": "application/json"}
# Update pwd
@app.route("/api/user_password", methods=["GET"])
@app.route("/api/user_password/<userid>", methods=["PUT"])
@login_required
def user_password(userid=False):
if request.method == "GET":
return (
json.dumps(app.admin.get_dice_pwd()),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
data = request.get_json(force=True)
password = data["password"]
temporary = data.get("temporary", True)
try:
res = app.admin.user_update_password(userid, password, temporary)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except KeycloakGetError as e:
log.error(e.error_message.decode("utf-8"))
return (
json.dumps({"msg": "Update password error."}),
500,
{"Content-Type": "application/json"},
)
return json.dumps({}), 405, {"Content-Type": "application/json"}
# User
@app.route("/api/user", methods=["POST"])
@app.route("/api/user/<userid>", methods=["PUT", "GET", "DELETE"])
@login_required
def user(userid=None):
if request.method == "DELETE":
app.admin.delete_user(userid)
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if app.admin.get_user_username(data["username"]):
return (
json.dumps({"msg": "Add user error: already exists."}),
409,
{"Content-Type": "application/json"},
)
data["enabled"] = not data.get("enabled", False)
data["quota"] = data["quota"] if data["quota"] != "false" else False
data["groups"] = data["groups"] if data.get("groups", False) else []
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps({"msg": "Precondition failed: already adding users"}),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.add_user, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Add user error."}),
500,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
data = request.get_json(force=True)
data["enabled"] = True if data.get("enabled", False) else False
data["groups"] = data["groups"] if data.get("groups", False) else []
data["roles"] = [data.pop("role-keycloak")]
try:
app.admin.user_update(data)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except UserNotFound:
return (
json.dumps({"msg": "User not found."}),
404,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
pass
if request.method == "GET":
user = app.admin.get_user(userid)
if not user:
return (
json.dumps({"msg": "User not found."}),
404,
{"Content-Type": "application/json"},
)
return json.dumps(user), 200, {"Content-Type": "application/json"}
@app.route("/api/roles")
@login_required
def roles():
sorted_roles = sorted(app.admin.get_roles(), key=lambda k: k["name"])
if current_user.role != "admin":
sorted_roles = [sr for sr in sorted_roles if sr["name"] != "admin"]
return json.dumps(sorted_roles), 200, {"Content-Type": "application/json"}
@app.route("/api/group", methods=["POST", "DELETE"])
@app.route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@login_required
def group(group_id=False):
if request.method == "POST":
data = request.get_json(force=True)
log.error(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
return (
json.dumps(app.admin.add_group(data)),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
try:
data = request.get_json(force=True)
except:
data = False
if data:
res = app.admin.delete_group_by_path(data["path"])
else:
if not group_id:
return (
json.dumps({"error": "bad_request","msg":"Bad request"}),
400,
{"Content-Type": "application/json"},
)
res = app.admin.delete_group_by_id(group_id)
return json.dumps(res), 200, {"Content-Type": "application/json"}
@app.route("/api/groups")
@app.route("/api/groups/<provider>", methods=["POST", "PUT", "GET", "DELETE"])
@login_required
def groups(provider=False):
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: str(k["name"]))
if current_user.role != "admin":
## internal groups should be avoided as are assigned with the role
sorted_groups = [sg for sg in sorted_groups if not system_group(sg["name"])]
else:
sorted_groups = [sg for sg in sorted_groups]
return json.dumps(sorted_groups), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
if provider == "keycloak":
return (
json.dumps(app.admin.delete_keycloak_groups()),
200,
{"Content-Type": "application/json"},
)
### SYSADM USERS ONLY
@app.route("/api/external", methods=["POST", "PUT", "GET", "DELETE"])
@login_required
def external():
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return json.dumps({}), 301, {"Content-Type": "application/json"}
else:
threads["external"] = None
if request.method == "POST":
data = request.get_json(force=True)
if data["format"] == "json-ga":
threads["external"] = threading.Thread(
target=app.admin.upload_json_ga, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
if data["format"] == "csv-ug":
valid = check_upload_errors(data)
if valid["pass"]:
threads["external"] = threading.Thread(
target=app.admin.upload_csv_ug, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
else:
return json.dumps(valid), 422, {"Content-Type": "application/json"}
if request.method == "PUT":
data = request.get_json(force=True)
threads["external"] = threading.Thread(
target=app.admin.sync_external, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
print("RESET")
app.admin.reset_external()
return json.dumps({}), 200, {"Content-Type": "application/json"}
return json.dumps({}), 500, {"Content-Type": "application/json"}
@app.route("/api/external/users")
@login_required
def external_users_list():
while threads["external"] is not None and threads["external"].is_alive():
time.sleep(0.5)
return (
json.dumps(app.admin.get_external_users()),
200,
{"Content-Type": "application/json"},
)
@app.route("/api/external/groups")
@login_required
def external_groups_list():
while threads["external"] is not None and threads["external"].is_alive():
time.sleep(0.5)
return (
json.dumps(app.admin.get_external_groups()),
200,
{"Content-Type": "application/json"},
)
@app.route("/api/external/roles", methods=["PUT"])
@login_required
def external_roles():
if request.method == "PUT":
return (
json.dumps(app.admin.external_roleassign(request.get_json(force=True))),
200,
{"Content-Type": "application/json"},
)
def check_upload_errors(data):
email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
for u in data["data"]:
try:
user_groups = [g.strip() for g in u["groups"].split(",")]
except:
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid groups: " + u["groups"],
}
log.error(resp)
return resp
if not re.fullmatch(email_regex, u["email"]):
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid email: " + u["email"],
}
log.error(resp)
return resp
if u["role"] not in ["admin", "manager", "teacher", "student"]:
if u["role"] == "":
resp = {
"pass": False,
"msg": "User " + u["username"] + " has no role assigned!",
}
log.error(resp)
return resp
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid role: " + u["role"],
}
log.error(resp)
return resp
return {"pass": True, "msg": ""}
@app.route("/api/dashboard/<item>", methods=["PUT"])
@login_required
def dashboard_put(item):
if item == "colours":
try:
data = request.get_json(force=True)
dashboard.update_colours(data)
except:
log.error(traceback.format_exc())
return json.dumps({"colours": data}), 200, {"Content-Type": "application/json"}
if item == "menu":
try:
data = request.get_json(force=True)
dashboard.update_menu(data)
except:
log.error(traceback.format_exc())
return json.dumps(data), 200, {"Content-Type": "application/json"}
if item == "logo":
dashboard.update_logo(request.files["croppedImage"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if item == "background":
dashboard.update_background(request.files["croppedImage"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
return (
json.dumps(
{
"error": "update_error",
"msg": "Error updating item " + item + "\n" + traceback.format_exc(),
}
),
500,
{"Content-Type": "application/json"},
)
@app.route("/api/legal/<item>", methods=["GET"])
# @login_required
def legal_get(item):
if request.method == "GET":
if item == "legal":
lang = request.args.get("lang")
if not lang or lang not in ["ca","es","en","fr"]:
lang="ca"
gen_legal_if_not_exists(lang)
return (
json.dumps({"html": get_legal(lang)}),
200,
{"Content-Type": "application/json"},
)
# if item == "privacy":
# return json.dumps({ "html": "<b>Privacy policy</b><br>This works!"}), 200, {'Content-Type': 'application/json'}
@app.route("/api/legal/<item>", methods=["POST"])
@login_required
def legal_put(item):
if request.method == "POST":
if item == "legal":
data = None
try:
data = data = request.get_json(force=True)
html = data["html"]
lang = data["lang"]
if not lang or lang not in ["ca","es","en","fr"]:
lang="ca"
new_legal(lang,html)
except:
log.error(traceback.format_exc())
return json.dumps(data), 200, {"Content-Type": "application/json"}
# if item == "privacy":
# data = None
# try:
# data = request.json
# html = data["html"]
# lang = data["lang"]
# except:
# log.error(traceback.format_exc())
# return json.dumps(data), 200, {'Content-Type': 'application/json'}

View File

@ -29,6 +29,8 @@ from .decorators import is_admin
avatars = Avatars()
from ..lib.legal import gen_legal_if_not_exists
""" OIDC TESTS """
# from ..auth.authentication import oidc
@ -99,6 +101,13 @@ def legal():
# data = json.loads(requests.get("http://isard-sso-api/json").text)
return render_template("pages/legal.html", title="Legal", nav="Legal", data={})
@app.route("/legal_text")
def legal_text():
lang = request.args.get("lang")
if not lang or lang not in ["ca","es","en","fr"]:
lang="ca"
gen_legal_if_not_exists(lang)
return render_template("pages/legal/"+lang)
### SYS ADMIN

View File

@ -22,6 +22,7 @@ services:
- ${DATA_FOLDER}/avatars:/admin/avatars:ro
- ${DATA_FOLDER}/moodle/saml2:/admin/moodledata/saml2:rw
- ${DATA_FOLDER}/saml_certs:/admin/saml_certs:rw
- ${DATA_FOLDER}/legal:/admin/admin/static/templates/pages/legal:rw
env_file:
- .env
environment: