145 lines
4.9 KiB
Python
145 lines
4.9 KiB
Python
#!/usr/bin/env python
|
|
import json
|
|
import logging as log
|
|
import os
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from pprint import pprint
|
|
|
|
import yaml
|
|
from keycloak import KeycloakAdmin
|
|
from minio import Minio
|
|
from minio.commonconfig import REPLACE, CopySource
|
|
from minio.deleteobjects import DeleteObject
|
|
|
|
from postgres import Postgres
|
|
|
|
|
|
class DefaultAvatars:
|
|
def __init__(
|
|
self,
|
|
url="http://isard-sso-keycloak:8080/auth/",
|
|
username=os.environ["KEYCLOAK_USER"],
|
|
password=os.environ["KEYCLOAK_PASSWORD"],
|
|
realm="master",
|
|
verify=True,
|
|
):
|
|
self.url = url
|
|
self.username = username
|
|
self.password = password
|
|
self.realm = realm
|
|
self.verify = verify
|
|
|
|
self.keycloak_pg = Postgres(
|
|
"isard-apps-postgresql",
|
|
"keycloak",
|
|
os.environ["KEYCLOAK_DB_USER"],
|
|
os.environ["KEYCLOAK_DB_PASSWORD"],
|
|
)
|
|
|
|
self.mclient = Minio(
|
|
"isard-sso-avatars:9000",
|
|
access_key="AKIAIOSFODNN7EXAMPLE",
|
|
secret_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
|
|
secure=False,
|
|
)
|
|
self.bucket = "master-avatars"
|
|
self._minio_set_realm()
|
|
self.update_missing_avatars()
|
|
|
|
def connect(self):
|
|
self.keycloak_admin = KeycloakAdmin(
|
|
server_url=self.url,
|
|
username=self.username,
|
|
password=self.password,
|
|
realm_name=self.realm,
|
|
verify=self.verify,
|
|
)
|
|
|
|
def update_missing_avatars(self):
|
|
sys_roles = ["admin", "manager", "teacher", "student"]
|
|
for u in self.get_users_without_image():
|
|
try:
|
|
img = [r + ".jpg" for r in sys_roles if r in u["role"]][0]
|
|
except:
|
|
img = "unknown.jpg"
|
|
|
|
self.mclient.fput_object(
|
|
self.bucket,
|
|
u["id"],
|
|
"custom/avatars/" + img,
|
|
content_type="image/jpeg ",
|
|
)
|
|
log.warning(
|
|
" AVATARS: Updated avatar for user "
|
|
+ u["username"]
|
|
+ " with role "
|
|
+ img.split(".")[0]
|
|
)
|
|
|
|
def _minio_set_realm(self):
|
|
if not self.mclient.bucket_exists(self.bucket):
|
|
self.mclient.make_bucket(self.bucket)
|
|
|
|
def minio_get_objects(self):
|
|
return [o.object_name for o in self.mclient.list_objects(self.bucket)]
|
|
|
|
def minio_delete_all_objects(self):
|
|
delete_object_list = map(
|
|
lambda x: DeleteObject(x.object_name),
|
|
self.mclient.list_objects(self.bucket),
|
|
)
|
|
errors = self.mclient.remove_objects(self.bucket, delete_object_list)
|
|
for error in errors:
|
|
log.error(" AVATARS: Error occured when deleting avatar object", error)
|
|
|
|
def get_users(self):
|
|
self.connect()
|
|
users = self.get_users_with_groups_and_roles()
|
|
return users
|
|
|
|
def get_users_without_image(self):
|
|
return [u for u in self.get_users() if u["id"] not in self.minio_get_objects()]
|
|
|
|
def get_users_with_groups_and_roles(self):
|
|
q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota
|
|
,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
|
|
,json_agg(r.name) as role
|
|
from user_entity as u
|
|
left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota'
|
|
left join user_group_membership as ugm on ugm.user_id = u.id
|
|
left join keycloak_group as g on g.id = ugm.group_id
|
|
left join keycloak_group as g_parent on g.parent_group = g_parent.id
|
|
left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id
|
|
left join user_role_mapping as rm on rm.user_id = u.id
|
|
left join keycloak_role as r on r.id = rm.role_id
|
|
group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value
|
|
order by u.username"""
|
|
|
|
(headers, users) = self.keycloak_pg.select_with_headers(q)
|
|
|
|
users_with_lists = [
|
|
list(l[:-4])
|
|
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
|
|
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
|
|
for l in users
|
|
]
|
|
|
|
users_with_lists = [
|
|
list(l[:-4])
|
|
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
|
|
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
|
|
for l in users_with_lists
|
|
]
|
|
|
|
list_dict_users = [dict(zip(headers, r)) for r in users_with_lists]
|
|
return list_dict_users
|
|
|
|
|
|
da = DefaultAvatars()
|