#!/usr/bin/env python import time ,os from datetime import datetime, timedelta import logging as log import traceback import yaml, json from pprint import pprint from jinja2 import Environment, FileSystemLoader from keycloak import KeycloakAdmin from postgres import Postgres import diceware options = diceware.handle_options(None) options.wordlist = 'cat_ascii' options.num = 3 class KeycloakClient(): """https://www.keycloak.org/docs-api/13.0/rest-api/index.html https://github.com/marcospereirampj/python-keycloak https://gist.github.com/kaqfa/99829941121188d7cef8271f93f52f1f """ def __init__(self, url="http://isard-sso-keycloak:8080/auth/", username=os.environ['KEYCLOAK_USER'], password=os.environ['KEYCLOAK_PASSWORD'], realm='master', verify=True): self.url=url self.username=username self.password=password self.realm=realm self.verify=verify self.keycloak_pg=Postgres('isard-apps-postgresql','keycloak',os.environ['KEYCLOAK_DB_USER'],os.environ['KEYCLOAK_DB_PASSWORD']) def connect(self): self.keycloak_admin = KeycloakAdmin(server_url=self.url, username=self.username, password=self.password, realm_name=self.realm, verify=self.verify) def update_pwds(self): self.get_users() def get_users(self): self.connect() users=self.get_users_with_groups_and_roles() userupdate=[] for u in users: if u['username'] not in ['admin','ddadmin'] and not u['username'].startswith('system_'): print('Generating password for user '+u['username']) userupdate.append({'id':u['id'], 'username':u['username'], 'password': diceware.get_passphrase(options=options)}) with open("user_temp_passwd.csv","w") as csv: for user in userupdate: csv.write("%s,%s,%s\n"%(user['id'],user['username'],user['password'])) for u in userupdate: print('Updating keycloak password for user '+u['username']) self.update_user_pwd(u['id'],u['password']) def update_user_pwd(self,user_id,password,temporary=True): payload={"credentials":[{"type":"password", "value":password, "temporary":temporary}]} self.connect() self.keycloak_admin.update_user( user_id, payload) def get_users_with_groups_and_roles(self): q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota ,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2 ,json_agg(r.name) as role from user_entity as u left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota' left join user_group_membership as ugm on ugm.user_id = u.id left join keycloak_group as g on g.id = ugm.group_id left join keycloak_group as g_parent on g.parent_group = g_parent.id left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id left join user_role_mapping as rm on rm.user_id = u.id left join keycloak_role as r on r.id = rm.role_id group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value order by u.username""" # q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota, g.id, g.path, g.name, # --,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2 # --,json_agg(r.name) as role # from user_entity as u # left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota' # left join user_group_membership as ugm on ugm.user_id = u.id # left join keycloak_group as g on g.id = ugm.group_id # --left join keycloak_group as g_parent on g.parent_group = g_parent.id # --left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id # left join user_role_mapping as rm on rm.user_id = u.id # left join keycloak_role as r on r.id = rm.role_id # --group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value # order by u.username""" # q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota # ,json_agg(g."name") as group_name,json_agg(g."id") as group_id,json_agg(g."path") as group_path # ,json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2 # ,json_agg(r.name) as role # from user_entity as u # left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota' # left join user_group_membership as ugm on ugm.user_id = u.id # left join keycloak_group as g on g.id = ugm.group_id # left join keycloak_group as g_parent on g.parent_group = g_parent.id # left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id # left join user_role_mapping as rm on rm.user_id = u.id # left join keycloak_role as r on r.id = rm.role_id # group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value # order by u.username""" (headers,users)=self.keycloak_pg.select_with_headers(q) users_with_lists = [list(l[:-4])+([[]] if l[-4] == [None] else [list(set(l[-4]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-3]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-2]))]) +\ ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users] users_with_lists = [list(l[:-4])+([[]] if l[-4] == [None] else [list(set(l[-4]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-3]))]) +\ ([[]] if l[-3] == [None] else [list(set(l[-2]))]) +\ ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users_with_lists] list_dict_users = [dict(zip(headers, r)) for r in users_with_lists] return list_dict_users k=KeycloakClient() k.update_pwds()