#!/usr/bin/env python import time ,os from datetime import datetime, timedelta import logging as log import traceback import yaml, json from pprint import pprint from keycloak import KeycloakAdmin from postgres import Postgres from minio import Minio from minio.commonconfig import REPLACE, CopySource from minio.deleteobjects import DeleteObject class DefaultAvatars(): def __init__(self, url="http://isard-sso-keycloak:8080/auth/", username=os.environ['KEYCLOAK_USER'], password=os.environ['KEYCLOAK_PASSWORD'], realm='master', verify=True): self.url=url self.username=username self.password=password self.realm=realm self.verify=verify self.keycloak_pg=Postgres('isard-apps-postgresql','keycloak',os.environ['KEYCLOAK_DB_USER'],os.environ['KEYCLOAK_DB_PASSWORD']) self.mclient = Minio( "isard-sso-avatars:9000", access_key="AKIAIOSFODNN7EXAMPLE", secret_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", secure=False ) self.bucket='master-avatars' self._minio_set_realm() self.update_missing_avatars() def connect(self): self.keycloak_admin = KeycloakAdmin(server_url=self.url, username=self.username, password=self.password, realm_name=self.realm, verify=self.verify) def update_missing_avatars(self): sys_roles=['admin','manager','teacher','student'] for u in self.get_users_without_image(): try: img=[r+'.jpg' for r in sys_roles if r in u['role']][0] except: img='unknown.jpg' self.mclient.fput_object( self.bucket, u['id'], "custom/avatars/"+img, content_type="image/jpeg ", ) log.warning(' AVATARS: Updated avatar for user '+u['username']+' with role '+img.split('.')[0]) def _minio_set_realm(self): if not self.mclient.bucket_exists(self.bucket): self.mclient.make_bucket(self.bucket) def minio_get_objects(self): return [o.object_name for o in self.mclient.list_objects(self.bucket)] def minio_delete_all_objects(self): delete_object_list = map( lambda x: DeleteObject(x.object_name), self.mclient.list_objects(self.bucket), ) errors=self.mclient.remove_objects(self.bucket, delete_object_list) for error in errors: log.error(" AVATARS: Error occured when deleting avatar object", error) def get_users(self): self.connect() users=self.get_users_with_groups_and_roles() return users def get_users_without_image(self): return [u for u in self.get_users() if u['id'] not in self.minio_get_objects()] def get_users_with_groups_and_roles(self): q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota ,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2 ,json_agg(r.name) as role from user_entity as u left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota' left join user_group_membership as ugm on ugm.user_id = u.id left join keycloak_group as g on g.id = ugm.group_id left join keycloak_group as g_parent on g.parent_group = g_parent.id left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id left join user_role_mapping as rm on rm.user_id = u.id left join keycloak_role as r on r.id = rm.role_id group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value order by u.username""" (headers,users)=self.keycloak_pg.select_with_headers(q) users_with_lists = [list(l[:-4])+([[]] if l[-4] == [None] else [list(set(l[-4]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-3]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-2]))]) +\ ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users] users_with_lists = [list(l[:-4])+([[]] if l[-4] == [None] else [list(set(l[-4]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-3]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-2]))]) +\ ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users_with_lists] list_dict_users = [dict(zip(headers, r)) for r in users_with_lists] return list_dict_users da=DefaultAvatars()