164 lines
6.6 KiB
Python
164 lines
6.6 KiB
Python
#!/usr/bin/env python
|
|
import json
|
|
import logging as log
|
|
import os
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from pprint import pprint
|
|
|
|
import diceware
|
|
import yaml
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from keycloak import KeycloakAdmin
|
|
from postgres import Postgres
|
|
|
|
options = diceware.handle_options(None)
|
|
options.wordlist = "cat_ascii"
|
|
options.num = 3
|
|
|
|
|
|
class KeycloakClient:
|
|
"""https://www.keycloak.org/docs-api/13.0/rest-api/index.html
|
|
https://github.com/marcospereirampj/python-keycloak
|
|
https://gist.github.com/kaqfa/99829941121188d7cef8271f93f52f1f
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
url="http://isard-sso-keycloak:8080/auth/",
|
|
username=os.environ["KEYCLOAK_USER"],
|
|
password=os.environ["KEYCLOAK_PASSWORD"],
|
|
realm="master",
|
|
verify=True,
|
|
):
|
|
self.url = url
|
|
self.username = username
|
|
self.password = password
|
|
self.realm = realm
|
|
self.verify = verify
|
|
|
|
self.keycloak_pg = Postgres(
|
|
"isard-apps-postgresql",
|
|
"keycloak",
|
|
os.environ["KEYCLOAK_DB_USER"],
|
|
os.environ["KEYCLOAK_DB_PASSWORD"],
|
|
)
|
|
|
|
def connect(self):
|
|
self.keycloak_admin = KeycloakAdmin(
|
|
server_url=self.url,
|
|
username=self.username,
|
|
password=self.password,
|
|
realm_name=self.realm,
|
|
verify=self.verify,
|
|
)
|
|
|
|
def update_pwds(self):
|
|
self.get_users()
|
|
|
|
def get_users(self):
|
|
self.connect()
|
|
users = self.get_users_with_groups_and_roles()
|
|
userupdate = []
|
|
for u in users:
|
|
if u["username"] not in ["admin", "ddadmin"] and not u[
|
|
"username"
|
|
].startswith("system_"):
|
|
print("Generating password for user " + u["username"])
|
|
userupdate.append(
|
|
{
|
|
"id": u["id"],
|
|
"username": u["username"],
|
|
"password": diceware.get_passphrase(options=options),
|
|
}
|
|
)
|
|
with open("user_temp_passwd.csv", "w") as csv:
|
|
for user in userupdate:
|
|
csv.write(
|
|
"%s,%s,%s\n" % (user["id"], user["username"], user["password"])
|
|
)
|
|
|
|
for u in userupdate:
|
|
print("Updating keycloak password for user " + u["username"])
|
|
self.update_user_pwd(u["id"], u["password"])
|
|
|
|
def update_user_pwd(self, user_id, password, temporary=True):
|
|
payload = {
|
|
"credentials": [
|
|
{"type": "password", "value": password, "temporary": temporary}
|
|
]
|
|
}
|
|
self.connect()
|
|
self.keycloak_admin.update_user(user_id, payload)
|
|
|
|
def get_users_with_groups_and_roles(self):
|
|
q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota
|
|
,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
|
|
,json_agg(r.name) as role
|
|
from user_entity as u
|
|
left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota'
|
|
left join user_group_membership as ugm on ugm.user_id = u.id
|
|
left join keycloak_group as g on g.id = ugm.group_id
|
|
left join keycloak_group as g_parent on g.parent_group = g_parent.id
|
|
left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id
|
|
left join user_role_mapping as rm on rm.user_id = u.id
|
|
left join keycloak_role as r on r.id = rm.role_id
|
|
group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value
|
|
order by u.username"""
|
|
|
|
# q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota, g.id, g.path, g.name,
|
|
# --,json_agg(g."name") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
|
|
# --,json_agg(r.name) as role
|
|
# from user_entity as u
|
|
# left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota'
|
|
# left join user_group_membership as ugm on ugm.user_id = u.id
|
|
# left join keycloak_group as g on g.id = ugm.group_id
|
|
# --left join keycloak_group as g_parent on g.parent_group = g_parent.id
|
|
# --left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id
|
|
# left join user_role_mapping as rm on rm.user_id = u.id
|
|
# left join keycloak_role as r on r.id = rm.role_id
|
|
# --group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value
|
|
# order by u.username"""
|
|
|
|
# q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, ua.value as quota
|
|
# ,json_agg(g."name") as group_name,json_agg(g."id") as group_id,json_agg(g."path") as group_path
|
|
# ,json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
|
|
# ,json_agg(r.name) as role
|
|
# from user_entity as u
|
|
# left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota'
|
|
# left join user_group_membership as ugm on ugm.user_id = u.id
|
|
# left join keycloak_group as g on g.id = ugm.group_id
|
|
# left join keycloak_group as g_parent on g.parent_group = g_parent.id
|
|
# left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id
|
|
# left join user_role_mapping as rm on rm.user_id = u.id
|
|
# left join keycloak_role as r on r.id = rm.role_id
|
|
# group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, ua.value
|
|
# order by u.username"""
|
|
(headers, users) = self.keycloak_pg.select_with_headers(q)
|
|
|
|
users_with_lists = [
|
|
list(l[:-4])
|
|
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
|
|
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
|
|
for l in users
|
|
]
|
|
|
|
users_with_lists = [
|
|
list(l[:-4])
|
|
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
|
|
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
|
|
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
|
|
for l in users_with_lists
|
|
]
|
|
|
|
list_dict_users = [dict(zip(headers, r)) for r in users_with_lists]
|
|
return list_dict_users
|
|
|
|
|
|
k = KeycloakClient()
|
|
k.update_pwds()
|