fix(admin): fixed group deletion recursivity

darta 2022-02-07 13:59:02 +01:00
parent ff9f3bc981
commit 4d003acb3e
3 changed files with 42 additions and 82 deletions

View File

@ -32,7 +32,7 @@ import secrets
from .events import Events from .events import Events
from .exceptions import UserExists, UserNotFound from .exceptions import UserExists, UserNotFound
from .helpers import count_repeated, rand_password, kpath2gids, kpath2kpaths from .helpers import count_repeated, rand_password, kpath2gids, kpath2kpaths, get_kid_from_kpath, get_group_with_childs
MANAGER = os.environ["CUSTOM_ROLE_MANAGER"] MANAGER = os.environ["CUSTOM_ROLE_MANAGER"]
TEACHER = os.environ["CUSTOM_ROLE_TEACHER"] TEACHER = os.environ["CUSTOM_ROLE_TEACHER"]
@ -450,11 +450,6 @@ class Admin:
theuser["nextcloud_groups"] = [] theuser["nextcloud_groups"] = []
theuser["quota"] = False theuser["quota"] = False
theuser["quota_used_bytes"] = False theuser["quota_used_bytes"] = False
# if not len(theuser['roles']):
# log.error(' SKIPPING USER WITHOUT ANY ROLE!!: '+theuser['username']+' . Should be fixed at keycloak level.')
# continue
users.append(theuser) users.append(theuser)
return users return users
@ -1818,26 +1813,33 @@ class Admin:
self.nextcloud.add_group(new_path) self.nextcloud.add_group(new_path)
def delete_group_by_id(self, group_id): def delete_group_by_id(self, group_id):
# TODO: Check if exists (None) ev = Events("Deleting group", "Deleting from keycloak")
group = self.keycloak.get_group_by_id(group_id) try:
keycloak_group = self.keycloak.get_group_by_id(group_id)
except Exception as e:
print(e)
ev.update_text("Error deleting group. Not found in keycloak!")
log.error(' KEYCLOAK GROUPS: Could not delete group '+str(group_id)+' as it does not exist!')
to_be_deleted = [] #{'id': '966ad67c-499a-4f56-bd1d-283691cde0e7', 'name': 'asdgfewfwe', 'path': '/asdgfewfwe', 'attributes': {}, 'realmRoles': [], 'clientRoles': {}, 'subGroups': [], 'access': {'view': True, 'manage': True, 'manageMembership': True}}
# Childs
for internalgroup in self.internal["groups"]: subgroups = get_group_with_childs(keycloak_group)
if internalgroup["name"].startswith(group["name"] + "."):
to_be_deleted.append(internalgroup["name"])
to_be_deleted.append(kpath2gid(group["path"]))
try: try:
self.keycloak.delete_group(group["id"]) self.keycloak.delete_group(group_id)
except: except:
log.error("KEYCLOAK: Could no delete group " + group["path"]) log.error("KEYCLOAK GROUPS: Could no delete group " + group["path"])
return
cohorts = self.moodle.get_cohorts() cohorts = self.moodle.get_cohorts()
for gid in to_be_deleted: for sg in subgroups:
cohort = [c["id"] for c in cohorts if c["name"] == gid] sg_gid=kpath2gid(sg)
cohort = [c["id"] for c in cohorts if c["name"] == sg_gid]
ev.update_text("Deleting from moodle cohort "+sg_gid)
self.moodle.delete_cohorts(cohort) self.moodle.delete_cohorts(cohort)
self.nextcloud.delete_group(gid) ev.update_text("Deleting from nextcloud group "+sg_gid)
self.nextcloud.delete_group(sg_gid)
def delete_group_by_path(self, path): def delete_group_by_path(self, path):
group = self.keycloak.get_group_by_path(path) group = self.keycloak.get_group_by_path(path)

View File

@ -3,6 +3,20 @@ import string
from collections import Counter from collections import Counter
from pprint import pprint from pprint import pprint
def get_recursive_groups(l_groups, l):
for d_group in l_groups:
data = {}
for key, value in d_group.items():
if key == "subGroups":
get_recursive_groups(value, l)
else:
data[key] = value
l.append(data)
return l
def get_group_with_childs(keycloak_group):
return [ g["path"] for g in get_recursive_groups([keycloak_group],[])]
def system_username(username): def system_username(username):
return ( return (
@ -11,21 +25,19 @@ def system_username(username):
else False else False
) )
def system_group(groupname): def system_group(groupname):
return True if groupname in ["admin", "manager", "teacher", "student"] else False return True if groupname in ["admin", "manager", "teacher", "student"] else False
def get_group_from_group_id(group_id, groups): def get_group_from_group_id(group_id, groups):
return next((d for d in groups if d.get("id") == group_id), None) return next((d for d in groups if d.get("id") == group_id), None)
def get_kid_from_kpath(kpath, groups):
ids = [g["id"] for g in groups if g["path"] == kpath]
if not len(ids) or len(ids) > 1: return False
return ids[0]
def get_gid_from_kgroup_id(kgroup_id, groups): def get_gid_from_kgroup_id(kgroup_id, groups):
# print(kgroup_id) return [g["path"].replace("/", ".")[1:] if len(g["path"].split("/")) else g["path"][1:] for g in groups if g["id"] == kgroup_id][0]
# pprint(groups)
# return get_group_from_group_id(kgroup_id,groups)['path'].replace('/','.')[1:]
return [g["path"].replace("/", ".")[1:] for g in groups if g["id"] == kgroup_id][0]
def get_gids_from_kgroup_ids(kgroup_ids, groups): def get_gids_from_kgroup_ids(kgroup_ids, groups):
return [get_gid_from_kgroup_id(kgroup_id, groups) for kgroup_id in kgroup_ids] return [get_gid_from_kgroup_id(kgroup_id, groups) for kgroup_id in kgroup_ids]

View File

@ -11,7 +11,7 @@ from pprint import pprint
import yaml import yaml
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from keycloak import KeycloakAdmin from keycloak import KeycloakAdmin
from .helpers import kpath2kpaths from .helpers import kpath2kpaths, get_recursive_groups
# from admin import app # from admin import app
@ -57,24 +57,6 @@ class KeycloakClient:
# from keycloak import KeycloakAdmin # from keycloak import KeycloakAdmin
# keycloak_admin = KeycloakAdmin(server_url="http://isard-sso-keycloak:8080/auth/",username="admin",password="keycloakkeycloak",realm_name="master",verify=False) # keycloak_admin = KeycloakAdmin(server_url="http://isard-sso-keycloak:8080/auth/",username="admin",password="keycloakkeycloak",realm_name="master",verify=False)
######## Example create group and subgroup
# try:
# self.add_group('level1')
# except:
# self.delete_group(self.get_group('/level1')['id'])
# self.add_group('level1')
# self.add_group('level2',parent=self.get_group('/level1')['id'])
# pprint(self.get_groups())
######## Example roles
# try:
# self.add_role('superman')
# except:
# self.delete_role('superman')
# self.add_role('superman')
# pprint(self.get_roles())
""" USERS """ """ USERS """
def get_user_id(self, username): def get_user_id(self, username):
@ -122,17 +104,6 @@ class KeycloakClient:
list_dict_users = [dict(zip(headers, r)) for r in users_with_lists] list_dict_users = [dict(zip(headers, r)) for r in users_with_lists]
# self.connect()
# groups = self.keycloak_admin.get_groups()
# for user in list_dict_users:
# new_user_groups = []
# for group_id in user['group']:
# found = [g for g in groups if g['id'] == group_id][0]
# new_user_groups.append({'id':found['id'],
# 'name':found['name'],
# 'path':found['path']})
# user['group']=new_user_groups
return list_dict_users return list_dict_users
def getparent(self, group_id, data): def getparent(self, group_id, data):
@ -290,36 +261,11 @@ class KeycloakClient:
self.connect() self.connect()
return self.keycloak_admin.get_groups() return self.keycloak_admin.get_groups()
def get_recursive_groups(self, l_groups, l=[]):
for d_group in l_groups:
d = {}
for key, value in d_group.items():
if key == "subGroups":
self.get_recursive_groups(value, l)
else:
d[key] = value
l.append(d)
return l
def get_groups(self, with_subgroups=True): def get_groups(self, with_subgroups=True):
## RETURNS ALL GROUPS in root list ## RETURNS ALL GROUPS in root list
self.connect() self.connect()
groups = self.keycloak_admin.get_groups() groups = self.keycloak_admin.get_groups()
return self.get_recursive_groups(groups) return get_recursive_groups(groups,[])
subgroups = []
subgroups1 = []
# This needs to be recursive function
if with_subgroups:
for group in groups:
if len(group["subGroups"]):
for sg in group["subGroups"]:
subgroups.append(sg)
# for sgroup in subgroups:
# if len(sgroup['subGroups']):
# for sg1 in sgroup['subGroups']:
# subgroups1.append(sg1)
return groups + subgroups + subgroups1
def get_group_by_id(self, group_id): def get_group_by_id(self, group_id):
self.connect() self.connect()