From a41d563f5fcb66c230e8fe783af956ae994379f2 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 2 Jun 2021 12:14:47 +0200 Subject: [PATCH] admin tests --- admin/src/admin/lib/admin.py | 119 ++++++++++++++++------------------ admin/src/admin/lib/moodle.py | 6 +- 2 files changed, 60 insertions(+), 65 deletions(-) diff --git a/admin/src/admin/lib/admin.py b/admin/src/admin/lib/admin.py index 34fe326..26ffa09 100644 --- a/admin/src/admin/lib/admin.py +++ b/admin/src/admin/lib/admin.py @@ -3,7 +3,7 @@ from admin import app from .keycloak_client import KeycloakClient from .moodle import Moodle from .nextcloud import Nextcloud -from .nextcloud_exc import * +from .nextcloud_exc import ProviderItemExists from .avatars import Avatars from .helpers import filter_roles_list, filter_roles_listofdicts @@ -89,7 +89,22 @@ class Admin(): 'roles':[]} log.warning(' SYSTEM READY TO HANDLE CONNECTIONS') # self.test_cohorts() - self.delete_all_moodle_cohorts() + # self.delete_all_moodle_cohorts() + + ########## Testing: get user group + # cids=[c['id'] for c in self.moodle.get_cohorts()] + # relations=self.moodle.get_cohort_members(cids) + # for r in relations: + # print(self.get_internalgroup_from_moodlecohortid(r['cohortid'])['path']) + + # def get_internalgroup_from_moodlecohortid(self,cohort_id): + # for g in self.internal['groups']: + # if not g['moodle']: continue + # if g['moodle_id'] == cohort_id: return g + # return '' + + + ## This function should be moved to postup.py def default_setup(self): @@ -104,7 +119,7 @@ class Admin(): self.keycloak.add_group('admin') ## Add default admin user to group admin (for nextcloud, just in case we go there) admin_uid=self.keycloak_admin.get_user_id('admin') - self.keycloak_admin.group_user_add(uid,gid) + self.keycloak_admin.group_user_add(admin_uid,gid) log.warning('KEYCLOAK: OK') except: log.warning('KEYCLOAK: Seems to be there already') @@ -227,32 +242,32 @@ class Admin(): ### testing - def test_cohorts(self): - cohorts=self.moodle.get_cohorts() + # def test_cohorts(self): + # cohorts=self.moodle.get_cohorts() - testc=[c for c in cohorts if c['name'] in ['teacher','/teacher','student','/student']] - pprint(testc) + # testc=[c for c in cohorts if c['name'] in ['teacher','/teacher','student','/student']] + # pprint(testc) - groups=[] - for u in self.internal['users']: - groups=groups+u['keycloak_groups'] - groups=list(dict.fromkeys(groups)) - pprint(groups) - pprint([g for g in groups if 'teacher' in g['keycloak_groups']]) - exit(1) - total=len(groups) - i=0 - for g in groups: - parts=g.split('/') - subpath='' - for i in range(1,len(parts)): - try: - log.warning(' MOODLE GROUPS: Adding group as cohort ('+str(i)+'/'+str(total)+'): '+subpath) - subpath=subpath+'/'+parts[i] - self.moodle.add_system_cohort(subpath) - except: - log.error('probably exists') - i=i+1 + # groups=[] + # for u in self.internal['users']: + # groups=groups+u['keycloak_groups'] + # groups=list(dict.fromkeys(groups)) + # pprint(groups) + # pprint([g for g in groups if 'teacher' in g['keycloak_groups']]) + # exit(1) + # total=len(groups) + # i=0 + # for g in groups: + # parts=g.split('/') + # subpath='' + # for i in range(1,len(parts)): + # try: + # log.warning(' MOODLE GROUPS: Adding group as cohort ('+str(i)+'/'+str(total)+'): '+subpath) + # subpath=subpath+'/'+parts[i] + # self.moodle.add_system_cohort(subpath) + # except: + # log.error('probably exists') + # i=i+1 ### end testing @@ -263,7 +278,7 @@ class Admin(): return True def get_moodle_users(self): - return [u for u in self.moodle.get_users_with_groups_and_roles() if u['username'] not in ['guest','ddadmin','admin'] and not u['username'].startswith('system')] + return [u for u in self.moodle.get_users_with_groups_and_roles() if u['username'] not in ['guest','ddadmin','admin'] and not u['username'].startswith('system_')] ## TOO SLOW. Not used. # def get_moodle_users(self): @@ -322,7 +337,6 @@ class Admin(): def _get_mix_users(self): kusers=self.get_keycloak_users() - # pprint(kusers) musers=self.get_moodle_users() nusers=self.get_nextcloud_users() @@ -367,6 +381,7 @@ class Admin(): del theuser['groups'] users.append(theuser) + # pprint([u['moodle_groups'] for u in users]) return users def get_roles(self): @@ -420,21 +435,21 @@ class Admin(): moodle_exists=[g for g in mgroups if g['name'] == name] if len(moodle_exists): - thegroup['path']='' + thegroup['path']=moodle_exists[0]['name'] thegroup={**moodle_exists[0], **thegroup} thegroup['moodle']=True - thegroup['moodle_id']=thegroup['id'] + thegroup['moodle_id']=moodle_exists[0]['id'] else: thegroup['moodle']=False - nextcloud_exists=[g for g in ngroups if g == name] + nextcloud_exists=[g for g in ngroups if g == name] if len(nextcloud_exists): nextcloud={"id":nextcloud_exists[0], "name":nextcloud_exists[0], - "path":''} + "path":nextcloud_exists[0]} thegroup={**nextcloud, **thegroup} thegroup['nextcloud']=True - thegroup['nextcloud_id']=thegroup['id'] + thegroup['nextcloud_id']=nextcloud_exists[0] ### is the path else: thegroup['nextcloud']=False @@ -573,7 +588,6 @@ class Admin(): log.error(' MOODLE GROUPS: Group '+subpath+ ' probably already exists') i=i+1 - log.error('getting cohorts') ### Get all existing moodle cohorts cohorts=self.moodle.get_cohorts() @@ -582,23 +596,10 @@ class Admin(): if not u['moodle']: log.error('Creating moodle user: '+u['username']) user=self.moodle.create_user(u['email'],u['username'],'1Random 1String',u['first'],u['last'])[0] - # print(str(user)) - user_id=user['id'] - # [{'id': 5, 'username': 'xkrlzwd'}] + # user_id=user['id'] - # for g in u['keycloak_groups']: - # log.info('Adding moodle user: '+u['username']+' to cohort '+g) - # cohort_id=[c['id'] for c in cohorts if c['name']==g][0] - - # # print(user_id) - # # print(cohort_id) - # self.moodle.add_user_to_cohort(user_id,cohort_id) - - ## Update cohorts on existing - # self.resync_data() - # for u in self.internal['users']: - # if u['moodle']: self.resync_data() + ### Add user to their cohorts (groups) for u in self.internal['users']: total=len(u['keycloak_groups']) index=0 @@ -606,33 +607,25 @@ class Admin(): log.error('for user groups') parts=g.split('/') subpath='' - pprint(parts) + # pprint(parts) for i in range(1,len(parts)): if parts[i] in ['admin','manager','teacher','student']: subpath=parts[i] else: subpath=subpath+'/'+parts[i] - # log.info('Adding moodle user: '+u['username']+' to cohort '+subpath) + try: cohort=[c for c in cohorts if c['name']==subpath][0] except: - pprint(subpath) - log.error(' MOODLE USER GROUPS: keycloak group does not exist as moodle cohort') + # pprint(subpath) + log.error(' MOODLE USER GROUPS: keycloak group '+subpath+' does not exist as moodle cohort. This should not happen. User '+u['username']+ ' not added.') + try: self.moodle.add_user_to_cohort(u['moodle_id'],cohort['id']) except: log.error(' MOODLE USER GROUPS: User '+u['username']+' already exists in cohort '+cohort['name']) index=index+1 self.resync_data() - ### MISING ASSING USER TO ROLE COHORT - - # for g in u['keycloak_groups']: - # log.info('Adding moodle user: '+u['username']+' to cohort '+g) - # cohort_id=[c['id'] for c in cohorts if c['name']==g][0] - - # print(u['moodle_id']) - # print('Adding to cohort id '+str(cohort_id)+' with name '+g) - # pprint(self.moodle.add_user_to_cohort(u['moodle_id'],cohort_id)) def delete_all_moodle_cohorts(self): cohorts=self.moodle.get_cohorts() diff --git a/admin/src/admin/lib/moodle.py b/admin/src/admin/lib/moodle.py index 2a1ac6b..f7206c0 100644 --- a/admin/src/admin/lib/moodle.py +++ b/admin/src/admin/lib/moodle.py @@ -96,6 +96,7 @@ class Moodle(): (headers,users)=self.moodle_pg.select_with_headers(q) users_with_lists = [list(l[:-2])+([[]] if l[-2] == [None] else [list(set(l[-2]))]) + ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users] list_dict_users = [dict(zip(headers, r)) for r in users_with_lists] + # pprint(list_dict_users) return list_dict_users ## NOT USED. Too slow @@ -141,8 +142,9 @@ class Moodle(): user = self.call('core_cohort_add_cohort_members', members=members) return user - def get_cohort_members(self, cohort_id): - members = self.call('core_cohort_get_cohort_members', cohortids=[cohort_id])[0]['userids'] + def get_cohort_members(self, cohort_ids): + members = self.call('core_cohort_get_cohort_members', cohortids=cohort_ids) + #[0]['userids'] return members def delete_cohorts(self, cohortids):