digitaldemocratic/admin/src/admin/lib/admin.py

1897 lines
67 KiB
Python

import json
import logging as log
import os
import traceback
from pprint import pprint
from time import sleep
import diceware
from admin import app
from .avatars import Avatars
from .helpers import (
filter_roles_list,
filter_roles_listofdicts,
get_gids_from_kgroup_ids,
get_group_from_group_id,
gid2kpath,
kpath2gid,
system_username,
)
from .keycloak_client import KeycloakClient
from .moodle import Moodle
from .nextcloud import Nextcloud
from .nextcloud_exc import ProviderItemExists
options = diceware.handle_options(None)
options.wordlist = "cat_ascii"
options.num = 3
import secrets
from .api_exceptions import Error
from .events import Events
from .exceptions import UserExists, UserNotFound
from .helpers import (
count_repeated,
get_group_with_childs,
get_kid_from_kpath,
kpath2gids,
kpath2kpaths,
rand_password,
)
MANAGER = os.environ["CUSTOM_ROLE_MANAGER"]
TEACHER = os.environ["CUSTOM_ROLE_TEACHER"]
STUDENT = os.environ["CUSTOM_ROLE_STUDENT"]
class Admin:
def __init__(self):
self.check_connections()
self.set_custom_roles()
self.overwrite_admins()
self.default_setup()
self.internal = {}
ready = False
while not ready:
try:
self.resync_data()
ready = True
except:
print(traceback.format_exc())
log.error("Could not resync data, waiting for system to be online...")
sleep(2)
self.sync_groups_from_keycloak()
self.external = {"users": [], "groups": [], "roles": []}
log.warning(" Updating missing user avatars with defaults")
self.av = Avatars()
# av.minio_delete_all_objects() # This will reset all avatars on usres
self.av.update_missing_avatars(self.internal["users"])
log.warning(" SYSTEM READY TO HANDLE CONNECTIONS")
def check_connections(self):
ready = False
while not ready:
try:
self.keycloak = KeycloakClient(verify=app.config["VERIFY"])
ready = True
except:
log.error(traceback.format_exc())
log.error("Could not connect to keycloak, waiting to be online...")
sleep(2)
log.warning("Keycloak connected.")
ready = False
while not ready:
try:
self.moodle = Moodle(verify=app.config["VERIFY"])
ready = True
except:
log.error("Could not connect to moodle, waiting to be online...")
sleep(2)
log.warning("Moodle connected.")
ready = False
while not ready:
try:
with open(
os.path.join(
app.root_path,
"../moodledata/saml2/moodle." + app.config["DOMAIN"] + ".pem",
),
"r",
) as pem:
ready = True
except IOError:
log.warning("Could not get moodle SAML2 pem certificate. Retrying...")
sleep(2)
ready = False
while not ready:
try:
self.nextcloud = Nextcloud(verify=app.config["VERIFY"])
ready = True
except:
log.error("Could not connect to nextcloud, waiting to be online...")
sleep(2)
log.warning("Nextcloud connected.")
def set_custom_roles(self):
pass
## This function should be moved to postup.py
def overwrite_admins(self):
log.warning("Setting defaults...")
dduser = os.environ["DDADMIN_USER"]
ddpassword = os.environ["DDADMIN_PASSWORD"]
ddmail = os.environ["DDADMIN_EMAIL"]
### User admin in group admin
try:
log.warning("KEYCLOAK: Adding group admin and user admin to this group")
admin_guid = self.keycloak.add_group("admin")
except:
pass
admin_guid = self.keycloak.get_group_by_path(path="/admin")["id"]
try:
## Add default admin user to group admin
admin_uid = self.keycloak.get_user_id("admin")
self.keycloak.group_user_add(admin_uid, admin_guid)
log.warning("KEYCLOAK: OK")
except:
print(traceback.format_exc())
log.warning("KEYCLOAK: Seems to be there already")
### ddadmin user
try:
log.warning(
"KEYCLOAK: Adding user ddadmin and adding to group and role admin"
)
uid = self.keycloak.add_user(
dduser,
"DD",
"Admin",
ddmail,
ddpassword,
group="admin",
temporary=False,
)
self.keycloak.assign_realm_roles(uid, "admin")
log.warning("KEYCLOAK: OK")
except:
log.warning("KEYCLOAK: Seems to be there already")
try:
log.warning("MOODLE: Adding default group admin")
self.moodle.add_system_cohort("admin", "system admins")
log.warning("MOODLE: OK")
except:
log.warning("MOODLE: Seems to be there already")
try:
log.warning("MOODLE: Adding user ddadmin and adding to siteadmins")
self.moodle.create_user(ddmail, dduser, ddpassword, "DD", "Admin")
uid = self.moodle.get_user_by("username", dduser)["users"][0]["id"]
self.moodle.add_user_to_siteadmin(uid)
log.warning("MOODLE: OK")
except:
log.warning("MOODLE: Seems to be there already")
try:
log.warning("NEXTCLOUD: Adding default group admin")
self.nextcloud.add_group("admin")
log.warning("NEXTCLOUD: OK")
except ProviderItemExists:
log.warning("NEXTCLOUD: Seems to be there already")
try:
log.warning("NEXTCLOUD: Adding user ddadmin and adding to group admin")
self.nextcloud.add_user(
dduser, ddpassword, group="admin", email=ddmail, displayname="DD Admin"
)
log.warning("NEXTCLOUD: OK")
except ProviderItemExists:
log.warning("NEXTCLOUD: Seems to be there already")
except:
log.error(traceback.format_exc())
exit(1)
def default_setup(self):
### Add default roles
try:
log.warning("KEYCLOAK: Adding default roles")
self.keycloak.add_role(MANAGER, "Realm managers")
self.keycloak.add_role(TEACHER, "Realm teachers")
self.keycloak.add_role(STUDENT, "Realm students")
log.warning("KEYCLOAK: OK")
except:
log.warning("KEYCLOAK: Seems to be there already")
#### Add default groups
try:
log.warning("KEYCLOAK: Adding default groups")
self.keycloak.add_group(MANAGER)
self.keycloak.add_group(TEACHER)
self.keycloak.add_group(STUDENT)
log.warning("KEYCLOAK: OK")
except:
log.warning("KEYCLOAK: Seems to be there already")
try:
log.warning("MOODLE: Adding default group manager")
self.moodle.add_system_cohort(MANAGER, "system managers")
log.warning("MOODLE: OK")
except:
log.warning("MOODLE: Seems to be there already")
try:
log.warning("MOODLE: Adding default group teacher")
self.moodle.add_system_cohort(TEACHER, "system teachers")
log.warning("MOODLE: OK")
except:
log.warning("MOODLE: Seems to be there already")
try:
log.warning("MOODLE: Adding default group student")
self.moodle.add_system_cohort(STUDENT, "system students")
log.warning("MOODLE: OK")
except:
log.warning("MOODLE: Seems to be there already")
try:
log.warning("NEXTCLOUD: Adding default group manager")
self.nextcloud.add_group(MANAGER)
log.warning("NEXTCLOUD: OK")
except ProviderItemExists:
log.warning("NEXTCLOUD: Seems to be there already")
try:
log.warning("NEXTCLOUD: Adding default group teacher")
self.nextcloud.add_group(TEACHER)
log.warning("NEXTCLOUD: OK")
except ProviderItemExists:
log.warning("NEXTCLOUD: Seems to be there already")
try:
log.warning("NEXTCLOUD: Adding default group student")
self.nextcloud.add_group(STUDENT)
log.warning("NEXTCLOUD: OK")
except ProviderItemExists:
log.warning("NEXTCLOUD: Seems to be there already")
# try:
# log.warning(
# "KEYCLOAK: Adding default users system_teacher, system_manager and system_student users"
# )
# uid = self.keycloak.add_user(
# "system_manager",
# "Manager",
# "System",
# "fakemanager@fake.com",
# "m@n@g3r",
# group=MANAGER,
# temporary=False,
# )
# self.keycloak.assign_realm_roles(uid, MANAGER)
# uid = self.keycloak.add_user(
# "system_teacher",
# "Teacher",
# "System",
# "faketeacher@fake.com",
# "t3@ch3r",
# group=TEACHER,
# temporary=False,
# )
# self.keycloak.assign_realm_roles(uid, TEACHER)
# uid = self.keycloak.add_user(
# "system_student",
# "Student",
# "System",
# "fakestudent@fake.com",
# "stud3nt",
# group=STUDENT,
# temporary=False,
# )
# self.keycloak.assign_realm_roles(uid, STUDENT)
# log.warning("KEYCLOAK: OK")
# except:
# log.warning("KEYCLOAK: Seems to be there already")
def resync_data(self):
self.internal = {
"users": self._get_mix_users(),
"groups": self._get_mix_groups(),
"roles": self._get_roles(),
}
return True
def get_moodle_users(self):
return [
u
for u in self.moodle.get_users_with_groups_and_roles()
if not system_username(u["username"])
]
## TOO SLOW. Not used.
# def get_moodle_users(self):
# log.warning('Loading moodle users... can take a long time...')
# users = self.moodle.get_users_with_groups_and_roles()
# #self.moodle.get_user_by('email','%%')['users']
# return [{"id":u['id'],
# "username":u['username'],
# "first": u['firstname'],
# "last": u['lastname'],
# "email": u['email'],
# "groups": u['groups'],
# "roles": u['roles']}
# for u in users]
def get_keycloak_users(self):
# log.warning('Loading keycloak users... can take a long time...')
users = self.keycloak.get_users_with_groups_and_roles()
return [
{
"id": u["id"],
"username": u["username"],
"first": u.get("first_name", None),
"last": u.get("last_name", None),
"enabled": u["enabled"],
"email": u.get("email", ""),
"groups": u["group"],
"roles": filter_roles_list(u["role"]),
}
for u in users
if not system_username(u["username"])
]
def get_nextcloud_users(self):
return [
{
"id": u["username"],
"username": u["username"],
"first": u["displayname"].split(" ")[0]
if u["displayname"] is not None
else "",
"last": u["displayname"].split(" ")[1]
if u["displayname"] is not None and len(u["displayname"].split(" ")) > 1
else "",
"email": u.get("email", ""),
"groups": u["groups"],
"roles": False,
"quota": u["quota"],
"quota_used_bytes": str(int(int(u["total_bytes"]) / 1024 / 1024 / 2))
+ " MB"
if u["total_bytes"] != None
else "0 MB",
}
# The theoretical bytes returned do not map to any known unit. The division is just an approach while we investigate.
for u in self.nextcloud.get_users_list()
if u["username"] not in ["guest", "ddadmin", "admin"]
and not u["username"].startswith("system")
]
## TOO SLOW
# def get_nextcloud_users(self):
# log.warning('Loading nextcloud users... can take a long time...')
# users = self.nextcloud.get_users_list()
# users_list=[]
# for user in users:
# u=self.nextcloud.get_user(user)
# users_list.append({"id":u['id'],
# "username":u['id'],
# "first": u['displayname'],
# "last": None,
# "email": u['email'],
# "groups": u['groups'],
# "roles": []})
# return users_list
def get_mix_users(self):
return self.internal["users"]
def _get_mix_users(self):
kgroups = self.keycloak.get_groups()
kusers = self.get_keycloak_users()
musers = self.get_moodle_users()
nusers = self.get_nextcloud_users()
kusers_usernames = [u["username"] for u in kusers]
musers_usernames = [u["username"] for u in musers]
nusers_usernames = [u["username"] for u in nusers]
all_users_usernames = set(
kusers_usernames + musers_usernames + nusers_usernames
)
users = []
for username in all_users_usernames:
theuser = {}
keycloak_exists = [u for u in kusers if u["username"] == username]
if len(keycloak_exists):
theuser = keycloak_exists[0]
theuser["keycloak"] = True
theuser["keycloak_groups"] = get_gids_from_kgroup_ids(
theuser.pop("groups"), kgroups
)
# self.keycloak.get_user_groups_paths(keycloak_exists[0]['id']) #keycloak_exists[0]['groups']
else:
theuser["id"] = False
theuser["keycloak"] = False
theuser["keycloak_groups"] = []
moodle_exists = [u for u in musers if u["username"] == username]
if len(moodle_exists):
theuser = {**moodle_exists[0], **theuser}
theuser["moodle"] = True
theuser["moodle_groups"] = moodle_exists[0]["groups"]
theuser["moodle_id"] = moodle_exists[0]["id"]
else:
theuser["moodle"] = False
theuser["moodle_groups"] = []
nextcloud_exists = [u for u in nusers if u["username"] == username]
if len(nextcloud_exists):
theuser = {**nextcloud_exists[0], **theuser}
theuser["nextcloud"] = True
theuser["nextcloud_groups"] = nextcloud_exists[0]["groups"]
theuser["nextcloud_id"] = nextcloud_exists[0]["id"]
theuser["quota"] = (
theuser["quota"]
if theuser.get("quota", False)
and theuser["quota"] != None
and theuser["quota"] != "none"
else False
)
else:
theuser["nextcloud"] = False
theuser["nextcloud_groups"] = []
theuser["quota"] = False
theuser["quota_used_bytes"] = False
users.append(theuser)
return users
def get_roles(self):
return self.internal["roles"]
def _get_roles(self):
return filter_roles_listofdicts(self.keycloak.get_roles())
def get_group_by_name(self, group_name):
group = [g for g in self.internal["groups"] if g["name"] == group_name]
return group[0] if len(group) else False
def get_keycloak_groups(self):
log.warning("Loading keycloak groups...")
return self.keycloak.get_groups()
def get_moodle_groups(self):
log.warning("Loading moodle groups...")
return self.moodle.get_cohorts()
def get_nextcloud_groups(self):
log.warning("Loading nextcloud groups...")
return self.nextcloud.get_groups_list()
def get_mix_groups(self):
return self.internal["groups"]
def _get_mix_groups(self):
kgroups = self.get_keycloak_groups()
mgroups = self.get_moodle_groups()
ngroups = self.get_nextcloud_groups()
kgroups = [] if kgroups is None else kgroups
mgroups = [] if mgroups is None else mgroups
ngroups = [] if ngroups is None else ngroups
kgroups_names = [kpath2gid(g["path"]) for g in kgroups]
mgroups_names = [g["name"] for g in mgroups]
ngroups_names = ngroups
all_groups_names = set(kgroups_names + mgroups_names + ngroups_names)
groups = []
for name in all_groups_names:
thegroup = {}
keycloak_exists = [g for g in kgroups if kpath2gid(g["path"]) == name]
if len(keycloak_exists):
# del keycloak_exists[0]['subGroups']
thegroup = keycloak_exists[0]
thegroup["keycloak"] = True
thegroup["name"] = kpath2gid(thegroup["path"])
# thegroup['path']=self.keycloak.get_group_path(keycloak_exists[0]['id'])
else:
thegroup["id"] = False
thegroup["keycloak"] = False
thegroup["name"] = False
thegroup["path"] = False
moodle_exists = [g for g in mgroups if g["name"] == name]
if len(moodle_exists):
# del moodle_exists[0]['idnumber']
# del moodle_exists[0]['descriptionformat']
# del moodle_exists[0]['theme']
# del moodle_exists[0]['visible']
# # thegroup['path']=moodle_exists[0]['name']
# thegroup={**moodle_exists[0], **thegroup}
thegroup["moodle"] = True
thegroup["moodle_id"] = moodle_exists[0]["id"]
thegroup["description"] = moodle_exists[0]["description"]
else:
thegroup["moodle"] = False
nextcloud_exists = [g for g in ngroups if g == name]
if len(nextcloud_exists):
# nextcloud={"id":nextcloud_exists[0],
# "name":nextcloud_exists[0],
# "path":nextcloud_exists[0]}
# thegroup={**nextcloud, **thegroup}
thegroup["nextcloud"] = True
thegroup["nextcloud_id"] = nextcloud_exists[0] ### is the path
else:
thegroup["nextcloud"] = False
groups.append(thegroup)
return groups
def sync_groups_from_keycloak(self):
self.resync_data()
for group in self.internal["groups"]:
if not group["keycloak"]:
if group["moodle"]:
try:
self.moodle.delete_cohorts([group["moodle_id"]])
except:
print(traceback.format_exc())
# log.error('MOODLE: User '+user['username']+' it is not in cohort '+group)
if group["nextcloud"]:
self.nextcloud.delete_group(group["nextcloud_id"])
self.resync_data()
for group in self.internal["groups"]:
if group["keycloak"]:
if not group["moodle"]:
self.moodle.add_system_cohort(group["name"], description="")
if not group["nextcloud"]:
self.nextcloud.add_group(group["name"])
self.resync_data()
def get_external_users(self):
return self.external["users"]
def get_external_groups(self):
return self.external["groups"]
def get_external_roles(self):
return self.external["roles"]
def upload_csv_ug(self, data):
log.warning("Processing uploaded users...")
users = []
total = len(data["data"])
item = 1
ev = Events("Processing uploaded users", total=len(data["data"]))
groups = []
for u in data["data"]:
log.warning(
"Processing ("
+ str(item)
+ "/"
+ str(total)
+ ") uploaded user: "
+ u["username"]
)
user_groups = [g.strip() for g in u["groups"].split(",") if g != ""]
if not len(user_groups):
user_groups = ["/" + u["role"].strip()]
pathslist = []
for group in user_groups:
pathpart = ""
for part in kpath2gid(group).split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
pathslist.append(u["role"])
groups = groups + user_groups
# pprint(u)
if u["quota"] == "":
u["quota"] = "500 MB"
if u["role"] == "student":
u["quota"] = "500MB"
if u["role"] == "teacher":
u["quota"] = "5 GB"
if u["role"] == "manager":
u["quota"] = "none"
if u["role"] == "admin":
u["quota"] = "none"
if u["quota"].lower() in ["false", "unlimited"]:
u["quota"] = "none"
users.append(
{
"provider": "external",
"id": u["id"].strip(),
"email": u["email"].strip(),
"first": u["firstname"].strip(),
"last": u["lastname"].strip(),
"username": u["username"].strip(),
"groups": user_groups,
"gids": pathslist,
"quota": u["quota"],
"roles": [u["role"].strip()],
"temporary": True
if u["password_temporal"].lower() == "yes"
else False,
"password": self.get_dice_pwd()
if u["password"] == ""
else u["password"],
}
)
item += 1
ev.increment({"name": u["username"]})
self.external["users"] = users
groups = list(dict.fromkeys(groups))
sysgroups = []
for g in groups:
if g != "":
sysgroups.append(
{
"provider": "external",
"id": g,
"name": kpath2gid(g),
"path": g,
"description": "Imported with csv",
}
)
self.external["groups"] = sysgroups
return True
def get_dice_pwd(self):
return diceware.get_passphrase(options=options)
def reset_external(self):
self.external = {"users": [], "groups": [], "roles": []}
return True
def upload_json_ga(self, data):
groups = []
log.warning("Processing uploaded groups...")
try:
ev = Events(
"Processing uploaded groups",
"Group:",
total=len(data["data"]["groups"]),
table="groups",
)
except:
log.error(traceback.format_exc())
for g in data["data"]["groups"]:
try:
group = {
"provider": "external",
"id": g["id"],
"mailid": g["email"].split("@")[0],
"name": g["name"],
"description": g["description"],
}
ev.increment({"name": g["name"], "data": group})
groups.append(group)
except:
pass
self.external["groups"] = groups
log.warning("Processing uploaded users...")
users = []
total = len(data["data"]["users"])
item = 1
ev = Events(
"Processing uploaded users",
"User:",
total=len(data["data"]["users"]),
table="users",
)
for u in data["data"]["users"]:
log.warning(
"Processing ("
+ str(item)
+ "/"
+ str(total)
+ ") uploaded user: "
+ u["primaryEmail"].split("@")[0]
)
new_user = {
"provider": "external",
"id": u["id"],
"email": u["primaryEmail"],
"first": u["name"]["givenName"],
"last": u["name"]["familyName"],
"username": u["primaryEmail"].split("@")[0],
"groups": [u["orgUnitPath"]], ## WARNING: Removing the first
"roles": [],
"password": self.get_dice_pwd(),
}
users.append(new_user)
item += 1
ev.increment({"name": u["primaryEmail"].split("@")[0], "data": new_user})
self.external["users"] = users
## Add groups to users (now they only have their orgUnitPath)
for g in self.external["groups"]:
for useringroup in data["data"]["d_members"][g["mailid"]]:
for u in self.external["users"]:
if u["id"] == useringroup["id"]:
u["groups"] = u["groups"] + [g["name"]]
return True
def sync_external(self, ids):
# self.resync_data()
log.warning("Starting sync to keycloak")
self.sync_to_keycloak_external()
### Now we only sycn external to keycloak and then they can be updated to others with UI buttons
log.warning("Starting sync to moodle")
self.sync_to_moodle_external()
log.warning("Starting sync to nextcloud")
self.sync_to_nextcloud_external()
log.warning("All syncs finished. Resyncing from apps...")
self.resync_data()
def add_keycloak_groups(self, groups):
total = len(groups)
i = 0
ev = Events(
"Syncing import groups to keycloak", "Adding group:", total=len(groups)
)
for g in groups:
i = i + 1
log.warning(
" KEYCLOAK GROUPS: Adding group ("
+ str(i)
+ "/"
+ str(total)
+ "): "
+ g
)
ev.increment({"name": g})
self.keycloak.add_group_tree(g)
def sync_to_keycloak_external(
self,
): ### This one works from the external, moodle and nextcloud from the internal
groups = []
for u in self.external["users"]:
groups = groups + u["groups"]
groups = list(dict.fromkeys(groups))
self.add_keycloak_groups(groups)
total = len(self.external["users"])
index = 0
ev = Events(
"Syncing import users to keycloak",
"Adding user:",
total=len(self.external["users"]),
)
for u in self.external["users"]:
index = index + 1
# Add user
log.warning(
" KEYCLOAK USERS: Adding user ("
+ str(index)
+ "/"
+ str(total)
+ "): "
+ u["username"]
)
ev.increment({"name": u["username"], "data": u})
uid = self.keycloak.add_user(
u["username"],
u["first"],
u["last"],
u["email"],
u["password"],
temporary=u["temporary"],
)
self.av.add_user_default_avatar(uid, u["roles"][0])
# Add user to role and group rolename
if len(u["roles"]) != 0:
log.warning(
" KEYCLOAK USERS: Assign user "
+ u["username"]
+ " with initial pwd "
+ u["password"]
+ " to role "
+ u["roles"][0]
)
self.keycloak.assign_realm_roles(uid, u["roles"][0])
gid = self.keycloak.get_group_by_path("/" + u["roles"][0])["id"]
self.keycloak.group_user_add(uid, gid)
# Add user to groups
for group in u["groups"]:
for g in kpath2kpaths(group):
log.warning(
" KEYCLOAK USERS: Assign user "
+ u["username"]
+ " to group "
+ str(g)
)
kuser = self.keycloak.get_group_by_path(path=g)
gid = kuser["id"]
self.keycloak.group_user_add(uid, gid)
# We add it as it is needed for moodle and nextcloud
u["groups"].append(u["roles"][0])
self.resync_data()
def add_moodle_groups(self, groups):
### Create all groups. Skip / in system groups
total = len(groups)
log.warning(groups)
ev = Events("Syncing groups from external to moodle", total=len(groups))
i = 1
for g in groups:
moodle_groups = kpath2gids(g)
for mg in moodle_groups:
try:
log.warning(
" MOODLE GROUPS: Adding group as cohort ("
+ str(i)
+ "/"
+ str(total)
+ "): "
+ mg
)
self.moodle.add_system_cohort(mg)
except Exception as e:
log.warning(
" MOODLE GROUPS: Group " + mg + " probably already exists"
)
i = i + 1
def sync_to_moodle_external(self): # works from the internal (keycloak)
### Process all groups from the users keycloak_groups key
groups = []
for u in self.external["users"]:
groups = groups + u["groups"]
groups = list(dict.fromkeys(groups))
self.add_moodle_groups(groups)
### Get all existing moodle cohorts
cohorts = self.moodle.get_cohorts()
### Create users in moodle
ev = Events(
"Syncing users from external to moodle", total=len(self.internal["users"])
)
for u in self.external["users"]:
log.warning(" MOODLE: Creating moodle user: " + u["username"])
ev.increment({"name": u["username"]})
if u["first"] == "":
u["first"] = "-"
if u["last"] == "":
u["last"] = "-"
try:
self.moodle.create_user(
u["email"],
u["username"],
"*12" + secrets.token_urlsafe(16),
u["first"],
u["last"],
)[0]
except UserExists:
log.warning(" MOODLE: The user: " + u["username"] + " already exsits.")
except:
log.error(" -->> Error creating on moodle the user: " + u["username"])
log.error(traceback.format_exc())
# user_id=user['id']
# self.resync_data()
### Add user to their cohorts (groups)
ev = Events(
"Syncing users groups from external to moodle cohorts",
total=len(self.internal["users"]),
)
cohorts = self.moodle.get_cohorts()
for u in self.external["users"]:
try:
uid = self.moodle.get_user_by("username", u["username"])["users"][0][
"id"
]
for group in u["gids"]:
cohort = [c for c in cohorts if c["name"] == group][0]
self.moodle.add_user_to_cohort(uid, cohort["id"])
except:
log.error("Exception on getting user from moodle: " + u["username"])
log.error(self.moodle.get_user_by("username", u["username"]))
# self.resync_data()
def delete_all_moodle_cohorts(self):
cohorts = self.moodle.get_cohorts()
ids = [c["id"] for c in cohorts]
self.moodle.delete_cohorts(ids)
def add_nextcloud_groups(self, groups):
### Create all groups. Skip / in system groups
total = len(groups)
log.warning(groups)
ev = Events("Syncing groups from external to nextcloud", total=len(groups))
i = 1
for g in groups:
nextcloud_groups = kpath2gids(g)
for ng in nextcloud_groups:
try:
log.warning(
" NEXTCLOUD GROUPS: Adding group ("
+ str(i)
+ "/"
+ str(total)
+ "): "
+ ng
)
self.nextcloud.add_group(ng)
except Exception as e:
log.warning(
" NEXTCLOUD GROUPS: Group " + ng + " probably already exists"
)
i = i + 1
def sync_to_nextcloud_external(self):
groups = []
for u in self.external["users"]:
groups = groups + u["gids"]
groups = list(dict.fromkeys(groups))
self.add_nextcloud_groups(groups)
ev = Events(
"Syncing users from external to nextcloud",
total=len(self.internal["users"]),
)
for u in self.external["users"]:
log.warning(
" NEXTCLOUD USERS: Creating nextcloud user: "
+ u["username"]
+ " with quota "
+ str(u["quota"])
+ " in groups "
+ str(u["gids"])
)
try:
ev.increment({"name": u["username"]})
self.nextcloud.add_user_with_groups(
u["username"],
"*12" + secrets.token_urlsafe(16),
u["quota"],
u["gids"],
u["email"],
u["first"] + " " + u["last"],
)
except ProviderItemExists:
log.warning(
" NEXTCLOUD USERS: User "
+ u["username"]
+ " already exists. Skipping..."
)
continue
except:
log.error(traceback.format_exc())
def sync_to_moodle(self): # works from the internal (keycloak)
### Process all groups from the users keycloak_groups key
groups = []
for u in self.internal["users"]:
groups = groups + u["keycloak_groups"]
groups = list(dict.fromkeys(groups))
ev = Events("Syncing groups from keycloak to moodle", total=len(groups))
pathslist = []
for group in groups:
pathpart = ""
for part in group.split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
ev.increment({"name": pathpart})
try:
log.info("MOODLE: Adding group " + pathpart)
self.moodle.add_system_cohort(pathpart)
except:
# print(traceback.format_exc())
log.warning(
"MOODLE: Group " + pathpart + " probably already exists."
)
### Get all existing moodle cohorts
cohorts = self.moodle.get_cohorts()
### Create users in moodle
ev = Events(
"Syncing users from keycloak to moodle", total=len(self.internal["users"])
)
for u in self.internal["users"]:
if not u["moodle"]:
log.warning("Creating moodle user: " + u["username"])
ev.increment({"name": u["username"]})
if u["first"] == "":
u["first"] = " "
if u["last"] == "":
u["last"] = " "
try:
self.moodle.create_user(
u["email"],
u["username"],
"*12" + secrets.token_urlsafe(16),
u["first"],
u["last"],
)[0]
except:
log.error(
" -->> Error creating on moodle the user: " + u["username"]
)
# user_id=user['id']
self.resync_data()
ev = Events(
"Syncing users with moodle cohorts", total=len(self.internal["users"])
)
cohorts = self.moodle.get_cohorts()
for u in self.internal["users"]:
groups = u["keycloak_groups"] + [u["roles"][0]]
for path in groups:
try:
cohort = [c for c in cohorts if c["name"] == path][0]
except:
# print(traceback.format_exc())
log.error(
" MOODLE USER GROUPS: keycloak group "
+ path
+ " does not exist as moodle cohort. This should not happen. User "
+ u["username"]
+ " not added."
)
try:
self.moodle.add_user_to_cohort(u["moodle_id"], cohort["id"])
except:
# log.error(traceback.format_exc())
log.warning(
" MOODLE USER GROUPS: User "
+ u["username"]
+ " already exists in cohort "
+ cohort["name"]
)
ev.increment(
{
"name": "User " + u["username"] + " added to moodle cohorts",
"data": [],
}
)
self.resync_data()
def sync_to_nextcloud(self):
groups = []
for u in self.internal["users"]:
groups = groups + u["keycloak_groups"]
groups = list(dict.fromkeys(groups))
total = len(groups)
i = 0
ev = Events("Syncing groups from keycloak to nextcloud", total=len(groups))
for g in groups:
parts = g.split("/")
subpath = ""
for i in range(1, len(parts)):
try:
log.warning(
" NEXTCLOUD GROUPS: Adding group ("
+ str(i)
+ "/"
+ str(total)
+ "): "
+ subpath
)
ev.increment({"name": subpath})
subpath = subpath + "/" + parts[i]
self.nextcloud.add_group(subpath)
except:
log.warning(
"NEXTCLOUD GROUPS: " + subpath + " probably already exists"
)
i = i + 1
ev = Events(
"Syncing users from keycloak to nextcloud",
total=len(self.internal["users"]),
)
for u in self.internal["users"]:
if not u["nextcloud"]:
log.warning(
" NEXTCLOUD USERS: Creating nextcloud user: "
+ u["username"]
+ " in groups "
+ str(u["keycloak_groups"])
)
try:
ev.increment({"name": u["username"]})
self.nextcloud.add_user_with_groups(
u["username"],
"*12" + secrets.token_urlsafe(16),
"500 GB",
u["keycloak_groups"],
u["email"],
u["first"] + " " + u["last"],
)
except ProviderItemExists:
log.warning(
"User " + u["username"] + " already exists. Skipping..."
)
continue
except:
log.error(traceback.format_exc())
def delete_keycloak_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["keycloak"]:
user = user[0]
keycloak_id = user["id"]
else:
return False
log.warning("Removing keycloak user: " + user["username"])
try:
self.keycloak.delete_user(keycloak_id)
except:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + user["username"])
self.av.delete_user_avatar(userid)
def delete_keycloak_users(self):
total = len(self.internal["users"])
i = 0
ev = Events(
"Deleting users from keycloak",
"Deleting user:",
total=len(self.internal["users"]),
)
for u in self.internal["users"]:
i = i + 1
if not u["keycloak"]:
continue
# Do not remove admin users!!! What to do with managers???
if ["admin"] in u["roles"]:
continue
log.info(
" KEYCLOAK USERS: Removing user ("
+ str(i)
+ "/"
+ str(total)
+ "): "
+ u["username"]
)
try:
ev.increment({"name": u["username"], "data": u})
self.keycloak.delete_user(u["id"])
except:
log.warning(
" KEYCLOAK USERS: Could not remove user: "
+ u["username"]
+ ". Probably already not exists."
)
self.av.minio_delete_all_objects()
def delete_nextcloud_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["nextcloud"]:
user = user[0]
nextcloud_id = user["nextcloud_id"]
else:
return False
log.warning("Removing nextcloud user: " + user["username"])
try:
self.nextcloud.delete_user(nextcloud_id)
except:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + user["username"])
def delete_nextcloud_users(self):
ev = Events("Deleting users from nextcloud", total=len(self.internal["users"]))
for u in self.internal["users"]:
if u["nextcloud"] and not u["keycloak"]:
if u["roles"] and "admin" in u["roles"]:
continue
log.info("Removing nextcloud user: " + u["username"])
try:
ev.increment({"name": u["username"]})
self.nextcloud.delete_user(u["nextcloud_id"])
except:
log.error(traceback.format_exc())
log.warning("Could not remove user: " + u["username"])
def delete_moodle_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if len(user) and user[0]["moodle"]:
user = user[0]
moodle_id = user["moodle_id"]
else:
return False
log.warning("Removing moodle user: " + user["username"])
try:
self.moodle.delete_users([moodle_id])
except:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + user["username"])
def delete_moodle_users(self):
userids = []
usernames = []
for u in self.internal["users"]:
if u["moodle"] and not u["keycloak"]:
userids.append(u["moodle_id"])
usernames.append(u["username"])
if len(userids):
log.warning("Removing moodle users: " + ",".join(usernames))
try:
self.moodle.delete_users(userids)
app.socketio.emit(
"update",
json.dumps(
{
"status": True,
"item": "user",
"action": "delete",
"itemdata": u,
}
),
namespace="//sio",
room="admin",
)
except:
log.error(traceback.format_exc())
log.warning("Could not remove users: " + ",".join(usernames))
def delete_keycloak_groups(self):
for g in self.internal["groups"]:
if not g["keycloak"]:
continue
# Do not remove admin group. It should not exist in keycloak, only in nextcloud
if g["name"] in ["admin", "manager", "teacher", "student"]:
continue
log.info("Removing keycloak group: " + g["name"])
try:
self.keycloak.delete_group(g["id"])
except:
log.error(traceback.format_exc())
log.warning("Could not remove group: " + g["name"])
def external_roleassign(self, data):
for newuserid in data["ids"]:
for externaluser in self.external["users"]:
if externaluser["id"] == newuserid:
externaluser["roles"] = [data["action"]]
for role in ["admin", "manager", "teacher", "student"]:
try:
externaluser["gids"].remove(role)
# externaluser['groups'].remove('/'+role)
except:
pass
externaluser["gids"].append(data["action"])
return True
def user_update_password(self, userid, password, temporary):
return self.keycloak.update_user_pwd(userid, password, temporary)
def update_users_from_keycloak(self):
kgroups = self.keycloak.get_groups()
users = [
{
"id": u["id"],
"username": u["username"],
"enabled": u["enabled"],
"email": u["email"],
"firstname": u["first"],
"lastname": u["last"],
"groups": get_gids_from_kgroup_ids(u["groups"], kgroups),
"roles": u["roles"],
"quota": "3 GB"
if len(u["roles"]) and u["roles"][0] != "student"
else "500 MB",
}
for u in self.get_keycloak_users()
]
for user in users:
ev = Events(
"Updating users from keycloak", "User:", total=len(users), table="users"
)
self.user_update(user)
ev.increment({"name": user["username"], "data": user["groups"]})
def user_update(self, user):
log.warning("Updating user moodle, nextcloud keycloak")
ev = Events("Updating user", "Updating user in keycloak")
## Get actual user role
try:
internaluser = [
u for u in self.internal["users"] if u["username"] == user["username"]
][0]
except:
raise UserNotFound
if not len(user["roles"]):
user["roles"] = ["student"]
delete_roles = list(
set(["admin", "manager", "teacher", "student"]) - set(user["roles"])
)
### keycloak groups
kadd = list(
set(user["groups"]) - set(internaluser["keycloak_groups"] + delete_roles)
)
kdelete = list(
set(internaluser["keycloak_groups"])
- set(user["groups"])
- set(user["roles"])
)
if user["roles"][0] not in kadd:
kadd.append(user["roles"][0])
### Moodle groups
madd = list(
set(user["groups"]) - set(internaluser["moodle_groups"] + delete_roles)
)
mdelete = list(
set(internaluser["moodle_groups"])
- set(user["groups"])
- set(user["roles"])
)
if user["roles"][0] not in madd:
madd.append(user["roles"][0])
### nextcloud groups
nadd = list(
set(user["groups"]) - set(internaluser["nextcloud_groups"] + delete_roles)
)
ndelete = list(
set(internaluser["nextcloud_groups"])
- set(user["groups"])
- set(user["roles"])
)
if user["roles"][0] not in nadd:
nadd.append(user["roles"][0])
#### Delete recursive to parent from this subgroup
#### DISABLED DELETE BUT KEPT ADD, as we should then check if the user is still in another subgroups of this group. How?
# pathslist=[]
# for group in kdelete:
# pathpart=''
# for part in group.split('.'):
# if pathpart=='':
# pathpart=part
# else:
# pathpart=pathpart+'.'+part
# if repeated_subpaths[pathpart] > 1:
# ## We do not delete it as the user is in multiple subpaths of this path.
# continue
# pathslist.append(pathpart)
# kdelete=pathslist
pathslist = []
for group in kadd:
pathpart = ""
for part in group.split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
kadd = pathslist
# print('KADD WITH SUBPATHS')
# print(kadd)
# print(count_repeated(kadd))
# print('KDELETE')
# print(kdelete)
# print(count_repeated(kdelete))
# pathslist=[]
# for group in mdelete:
# pathpart=''
# for part in group.split('.'):
# if pathpart=='':
# pathpart=part
# else:
# pathpart=pathpart+'.'+part
# pathslist.append(pathpart)
# mdelete=pathslist
pathslist = []
for group in madd:
pathpart = ""
for part in group.split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
madd = pathslist
# print('MADD WITH SUBPATHS')
# print(madd)
# print(count_repeated(madd))
# print('MDELETE WITH SUBPATHS')
# print(mdelete)
# print(count_repeated(mdelete))
# pathslist=[]
# for group in ndelete:
# pathpart=''
# for part in group.split('.'):
# if pathpart=='':
# pathpart=part
# else:
# pathpart=pathpart+'.'+part
# pathslist.append(pathpart)
# ndelete=pathslist
pathslist = []
for group in nadd:
pathpart = ""
for part in group.split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
nadd = pathslist
# print('NADD WITH SUBPATHS')
# print(nadd)
# print(count_repeated(nadd))
# print('NDELETE WITH SUBPATHS')
# print(ndelete)
# print(count_repeated(ndelete))
self.update_keycloak_user(internaluser["id"], user, kdelete, kadd)
ev.update_text("Syncing data from applications...")
self.resync_data()
ev.update_text("Updating user in moodle")
self.update_moodle_user(internaluser["id"], user, mdelete, madd)
ev.update_text("Updating user in nextcloud")
self.update_nextcloud_user(internaluser["id"], user, ndelete, nadd)
ev.update_text("User updated")
return True
def update_keycloak_user(self, user_id, user, kdelete, kadd):
# pprint(self.keycloak.get_user_realm_roles(user_id))
self.keycloak.remove_user_realm_roles(user_id, "student")
self.keycloak.assign_realm_roles(user_id, user["roles"][0])
for group in kdelete:
group_id = self.keycloak.get_group_by_path(gid2kpath(group))["id"]
self.keycloak.group_user_remove(user_id, group_id)
for group in kadd:
group_id = self.keycloak.get_group_by_path(gid2kpath(group))["id"]
self.keycloak.group_user_add(user_id, group_id)
self.keycloak.user_update(
user_id, user["enabled"], user["email"], user["firstname"], user["lastname"]
)
self.resync_data()
return True
def enable_users(self, data):
# data={'id':'','username':''}
ev = Events("Bulk actions", "Enabling user:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.keycloak.user_enable(user["id"])
self.resync_data()
def disable_users(self, data):
# data={'id':'','username':''}
ev = Events("Bulk actions", "Disabling user:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.keycloak.user_disable(user["id"])
self.resync_data()
def update_moodle_user(self, user_id, user, mdelete, madd):
internaluser = [u for u in self.internal["users"] if u["id"] == user_id][0]
cohorts = self.moodle.get_cohorts()
for group in mdelete:
cohort = [c for c in cohorts if c["name"] == group[0]]
try:
self.moodle.delete_user_in_cohort(
internaluser["moodle_id"], cohort["id"]
)
except:
log.error(
"MOODLE: User " + user["username"] + " it is not in cohort " + group
)
if not internaluser["moodle"]:
self.add_moodle_user(
username=user["username"],
email=user["email"],
first_name=user["firstname"],
last_name=user["lastname"],
)
self.resync_data()
else:
self.moodle.update_user(
username=user["username"],
email=user["email"],
first_name=user["firstname"],
last_name=user["lastname"],
enabled=user["enabled"],
)
for group in madd:
cohort = [c for c in cohorts if c["name"] == group][0]
self.moodle.add_user_to_cohort(internaluser["moodle_id"], cohort["id"])
return True
def add_moodle_user(
self,
username,
email,
first_name,
last_name,
password="*12" + secrets.token_urlsafe(16),
):
log.warning("Creating moodle user: " + username)
ev = Events("Add user", username)
try:
self.moodle.create_user(email, username, password, first_name, last_name)
ev.update_text({"name": "Added to moodle", "data": []})
except UserExists:
log.error(" -->> User already exists")
error = Events("User already exists.", str(se), type="error")
except SystemError as se:
log.error("Moodle create user error: " + str(se))
error = Events("Moodle create user error", str(se), type="error")
except:
log.error(" -->> Error creating on moodle the user: " + username)
print(traceback.format_exc())
error = Events("Internal error", "Check logs", type="error")
def update_nextcloud_user(self, user_id, user, ndelete, nadd):
## TODO: Disable de user? Is really needed? it is disabled in keycloak, so can't login again
## ocs/v1.php/cloud/users/{userid}/disable
internaluser = [u for u in self.internal["users"] if u["id"] == user_id][0]
if not internaluser["nextcloud"]:
try:
self.add_nextcloud_user(
user["username"],
user["email"],
user["quota"],
user["firstname"],
user["lastname"],
user["groups"],
)
except:
log.warning(
"NEXTCLOUD: Ooops! User "
+ user["username"]
+ " seems to be in NC but our db info says not...."
)
self.resync_data()
else:
if not user["quota"] or user["quota"] == "false":
self.nextcloud.update_user(
user["username"],
{
"quota": "none",
"email": user["email"],
"displayname": user["firstname"] + " " + user["lastname"],
},
)
else:
self.nextcloud.update_user(
user["username"],
{
"quota": user["quota"],
"email": user["email"],
"displayname": user["firstname"] + " " + user["lastname"],
},
)
for group in ndelete:
self.nextcloud.remove_user_from_group(user["username"], group)
for group in nadd:
self.nextcloud.add_user_to_group(user["username"], group)
def add_nextcloud_user(
self,
username,
email,
quota,
first_name,
last_name,
groups,
password="*12" + secrets.token_urlsafe(16),
):
log.warning(
" NEXTCLOUD USERS: Creating nextcloud user: "
+ username
+ " in groups "
+ str(groups)
)
ev = Events("Add user", username)
try:
# Quota is "1 GB", "500 MB"
self.nextcloud.add_user_with_groups(
username, password, quota, groups, email, first_name + " " + last_name
)
ev.increment({"name": "Added to nextcloud", "data": []})
except ProviderItemExists:
log.warning(
" NEXTCLOUD USERS: User " + username + " already exists. Skipping..."
)
except:
log.error(traceback.format_exc())
def delete_users(self, data):
ev = Events("Bulk actions", "Deleting users:", total=len(data))
for user in data:
ev.increment({"name": user["username"], "data": user["username"]})
self.delete_user(user["id"])
self.resync_data()
def delete_user(self, userid):
log.warning("Deleting user moodle, nextcloud keycloak")
ev = Events("Deleting user", "Deleting from moodle")
self.delete_moodle_user(userid)
ev.update_text("Deleting from nextcloud")
self.delete_nextcloud_user(userid)
ev.update_text("Deleting from keycloak")
self.delete_keycloak_user(userid)
ev.update_text("Syncing data from applications...")
self.resync_data()
ev.update_text("User deleted")
return True
def get_user(self, userid):
user = [u for u in self.internal["users"] if u["id"] == userid]
if not len(user):
return False
return user[0]
def get_user_username(self, username):
user = [u for u in self.internal["users"] if u["username"] == username]
if not len(user):
return False
return user[0]
def add_user(self, u):
pathslist = []
for group in u["groups"]:
pathpart = ""
for part in group.split("."):
if pathpart == "":
pathpart = part
else:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
### KEYCLOAK
#######################
ev = Events("Add user", u["username"], total=5)
log.warning(" KEYCLOAK USERS: Adding user: " + u["username"])
uid = self.keycloak.add_user(
u["username"],
u["first"],
u["last"],
u["email"],
u["password"],
enabled=u["enabled"],
)
self.av.add_user_default_avatar(uid, u["role"])
# Add user to role and group rolename
log.warning(
" KEYCLOAK USERS: Assign user "
+ u["username"]
+ " with initial pwd "
+ u["password"]
+ " to role "
+ u["role"]
)
self.keycloak.assign_realm_roles(uid, u["role"])
gid = self.keycloak.get_group_by_path(path="/" + u["role"])["id"]
self.keycloak.group_user_add(uid, gid)
ev.increment({"name": "Added to system", "data": []})
# Add user to groups
for path in pathslist:
path = "/" + path.replace(".", "/")
log.warning(
" KEYCLOAK USERS: Assign user " + u["username"] + " to group " + path
)
gid = self.keycloak.get_group_by_path(path=path)["id"]
self.keycloak.group_user_add(uid, gid)
ev.increment({"name": "Added to system groups", "data": []})
pathslist.append(u["role"])
### MOODLE
###############################
# Add user
log.warning("Creating moodle user: " + u["username"])
try:
moodle_id = self.moodle.create_user(
u["email"],
u["username"],
"*12" + secrets.token_urlsafe(16),
u["first"],
u["last"],
)[0]["id"]
ev.increment({"name": "Added to moodle", "data": []})
except UserExists:
log.error(" -->> User already exists")
error = Events("User already exists.", str(se), type="error")
except SystemError as se:
log.error("Moodle create user error: " + str(se))
error = Events("Moodle create user error", str(se), type="error")
except:
log.error(" -->> Error creating on moodle the user: " + u["username"])
print(traceback.format_exc())
error = Events("Internal error", "Check logs", type="error")
# Add user to cohort
## Get all existing moodle cohorts
cohorts = self.moodle.get_cohorts()
for path in pathslist:
print("MOODLE ADD SUBPATH: " + path)
try:
cohort = [c for c in cohorts if c["name"] == path][0]
except:
# print(traceback.format_exc())
log.error(
" MOODLE USER GROUPS: keycloak group "
+ path
+ " does not exist as moodle cohort. This should not happen. User "
+ u["username"]
+ " not added."
)
try:
self.moodle.add_user_to_cohort(moodle_id, cohort["id"])
except:
log.error(
" MOODLE USER GROUPS: User "
+ u["username"]
+ " already exists in cohort "
+ cohort["name"]
)
ev.increment({"name": "Added to moodle cohorts", "data": []})
### NEXTCLOUD
########################3
log.warning(
" NEXTCLOUD USERS: Creating nextcloud user: "
+ u["username"]
+ " in groups "
+ str(list)
)
try:
# Quota is in MB
self.nextcloud.add_user_with_groups(
u["username"],
"*12" + secrets.token_urlsafe(16),
u["quota"],
pathslist,
u["email"],
u["first"] + " " + u["last"],
)
ev.increment({"name": "Added to nextcloud", "data": []})
except ProviderItemExists:
log.warning("User " + u["username"] + " already exists. Skipping...")
except:
log.error(traceback.format_exc())
self.resync_data()
return uid
def add_group(self, g):
# TODO: Check if exists
# We add in keycloak with his name, will be shown in app with full path with dots
if g["parent"] != None:
g["parent"] = gid2kpath(g["parent"])
new_path = self.keycloak.add_group(g["name"], g["parent"])
if g["parent"] != None:
new_path = kpath2gid(new_path["path"])
else:
new_path = g["name"]
self.moodle.add_system_cohort(new_path, description=g["description"])
self.nextcloud.add_group(new_path)
self.resync_data()
return new_path
def delete_group_by_id(self, group_id):
ev = Events("Deleting group", "Deleting from keycloak")
try:
keycloak_group = self.keycloak.get_group_by_id(group_id)
except Exception as e:
print(e)
ev.update_text("Error deleting group. Not found in keycloak!")
log.error(
" KEYCLOAK GROUPS: Could not delete group "
+ str(group_id)
+ " as it does not exist!"
)
raise Error("not_found", "Group " + group_id + " not found.")
# {'id': '966ad67c-499a-4f56-bd1d-283691cde0e7', 'name': 'asdgfewfwe', 'path': '/asdgfewfwe', 'attributes': {}, 'realmRoles': [], 'clientRoles': {}, 'subGroups': [], 'access': {'view': True, 'manage': True, 'manageMembership': True}}
subgroups = get_group_with_childs(keycloak_group)
try:
self.keycloak.delete_group(group_id)
except:
log.error("KEYCLOAK GROUPS: Could no delete group " + group["path"])
return
cohorts = self.moodle.get_cohorts()
for sg in subgroups:
sg_gid = kpath2gid(sg)
cohort = [c["id"] for c in cohorts if c["name"] == sg_gid]
ev.update_text("Deleting from moodle cohort " + sg_gid)
self.moodle.delete_cohorts(cohort)
ev.update_text("Deleting from nextcloud group " + sg_gid)
self.nextcloud.delete_group(sg_gid)
self.resync_data()
def delete_group_by_path(self, path):
group = self.keycloak.get_group_by_path(path)
to_be_deleted = []
# Childs
for internalgroup in self.internal["groups"]:
if internalgroup["name"].startswith(group["name"] + "."):
to_be_deleted.append(internalgroup["name"])
to_be_deleted.append(kpath2gid(group["path"]))
try:
self.keycloak.delete_group(group["id"])
except:
log.error("KEYCLOAK: Could no delete group " + group["path"])
cohorts = self.moodle.get_cohorts()
for gid in to_be_deleted:
cohort = [c["id"] for c in cohorts if c["name"] == gid]
self.moodle.delete_cohorts(cohort)
self.nextcloud.delete_group(gid)
self.resync_data()