refactor(admin): moved saml setup scripts to its own folder

darta 2021-12-29 00:40:07 +01:00
parent eec1f7fe22
commit c811ebb1dd
9 changed files with 660 additions and 18 deletions

View File

@ -12,7 +12,7 @@ import yaml
from jinja2 import Environment, FileSystemLoader
from keycloak import KeycloakAdmin
from admin import app
# from admin import app
from .postgres import Postgres

View File

@ -9,7 +9,7 @@ from datetime import datetime, timedelta
import mysql.connector
import yaml
from admin import app
# from admin import app
class Mysql:

View File

@ -9,7 +9,7 @@ from datetime import datetime, timedelta
import psycopg2
import yaml
from admin import app
# from admin import app
class Postgres:

View File

@ -0,0 +1,516 @@
#!/usr/bin/env python
# coding=utf-8
import json
import logging as log
import os
import time
import traceback
from datetime import datetime, timedelta
from pprint import pprint
import yaml
from jinja2 import Environment, FileSystemLoader
from keycloak import KeycloakAdmin
# from admin import app
from .postgres import Postgres
class KeycloakClient:
"""https://www.keycloak.org/docs-api/13.0/rest-api/index.html
https://github.com/marcospereirampj/python-keycloak
https://gist.github.com/kaqfa/99829941121188d7cef8271f93f52f1f
"""
def __init__(
self,
url="http://isard-sso-keycloak:8080/auth/",
username=os.environ["KEYCLOAK_USER"],
password=os.environ["KEYCLOAK_PASSWORD"],
realm="master",
verify=True,
):
self.url = url
self.username = username
self.password = password
self.realm = realm
self.verify = verify
self.keycloak_pg = Postgres(
"isard-apps-postgresql",
"keycloak",
os.environ["KEYCLOAK_DB_USER"],
os.environ["KEYCLOAK_DB_PASSWORD"],
)
def connect(self):
self.keycloak_admin = KeycloakAdmin(
server_url=self.url,
username=self.username,
password=self.password,
realm_name=self.realm,
verify=self.verify,
)
# from keycloak import KeycloakAdmin
# keycloak_admin = KeycloakAdmin(server_url="http://isard-sso-keycloak:8080/auth/",username="admin",password="keycloakkeycloak",realm_name="master",verify=False)
######## Example create group and subgroup
# try:
# self.add_group('level1')
# except:
# self.delete_group(self.get_group('/level1')['id'])
# self.add_group('level1')
# self.add_group('level2',parent=self.get_group('/level1')['id'])
# pprint(self.get_groups())
######## Example roles
# try:
# self.add_role('superman')
# except:
# self.delete_role('superman')
# self.add_role('superman')
# pprint(self.get_roles())
""" USERS """
def get_user_id(self, username):
self.connect()
return self.keycloak_admin.get_user_id(username)
def get_users(self):
self.connect()
return self.keycloak_admin.get_users({})
def get_users_with_groups_and_roles(self):
q = """select u.id, u.username, u.email, u.first_name, u.last_name, u.realm_id, u.enabled, ua.value as quota
,json_agg(g."id") as group, json_agg(g_parent."name") as group_parent1, json_agg(g_parent2."name") as group_parent2
,json_agg(r.name) as role
from user_entity as u
left join user_attribute as ua on ua.user_id=u.id and ua.name = 'quota'
left join user_group_membership as ugm on ugm.user_id = u.id
left join keycloak_group as g on g.id = ugm.group_id
left join keycloak_group as g_parent on g.parent_group = g_parent.id
left join keycloak_group as g_parent2 on g_parent.parent_group = g_parent2.id
left join user_role_mapping as rm on rm.user_id = u.id
left join keycloak_role as r on r.id = rm.role_id
group by u.id,u.username,u.email,u.first_name,u.last_name, u.realm_id, u.enabled, ua.value
order by u.username"""
(headers, users) = self.keycloak_pg.select_with_headers(q)
users_with_lists = [
list(l[:-4])
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
for l in users
]
users_with_lists = [
list(l[:-4])
+ ([[]] if l[-4] == [None] else [list(set(l[-4]))])
+ ([[]] if l[-3] == [None] else [list(set(l[-3]))])
+ ([[]] if l[-3] == [None] else [list(set(l[-2]))])
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
for l in users_with_lists
]
list_dict_users = [dict(zip(headers, r)) for r in users_with_lists]
# self.connect()
# groups = self.keycloak_admin.get_groups()
# for user in list_dict_users:
# new_user_groups = []
# for group_id in user['group']:
# found = [g for g in groups if g['id'] == group_id][0]
# new_user_groups.append({'id':found['id'],
# 'name':found['name'],
# 'path':found['path']})
# user['group']=new_user_groups
return list_dict_users
def getparent(self, group_id, data):
# Recursively get full path from any group_id in the tree
path = ""
for item in data:
if group_id == item[0]:
path = self.getparent(item[2], data)
path = f"{path}/{item[1]}"
return path
def get_group_path(self, group_id):
# Get full path using getparent recursive func
# RETURNS: String with full path
q = """SELECT * FROM keycloak_group"""
groups = self.keycloak_pg.select(q)
return self.getparent(group_id, groups)
def get_user_groups_paths(self, user_id):
# Get full paths for user grups
# RETURNS list of paths
q = """SELECT group_id FROM user_group_membership WHERE user_id = '%s'""" % (
user_id
)
user_group_ids = self.keycloak_pg.select(q)
paths = []
for g in user_group_ids:
paths.append(self.get_group_path(g[0]))
return paths
## Too slow. Used the direct postgres
# def get_users_with_groups_and_roles(self):
# self.connect()
# users=self.keycloak_admin.get_users({})
# for user in users:
# user['groups']=[g['path'] for g in self.keycloak_admin.get_user_groups(user_id=user['id'])]
# user['roles']= [r['name'] for r in self.keycloak_admin.get_realm_roles_of_user(user_id=user['id'])]
# return users
def add_user(
self,
username,
first,
last,
email,
password,
group=False,
temporary=True,
enabled=True,
):
# RETURNS string with keycloak user id (the main id in this app)
self.connect()
username = username.lower()
try:
uid = self.keycloak_admin.create_user(
{
"email": email,
"username": username,
"enabled": enabled,
"firstName": first,
"lastName": last,
"credentials": [
{"type": "password", "value": password, "temporary": temporary}
],
}
)
except:
log.error(traceback.format_exc())
if group:
path = "/" + group if group[1:] != "/" else group
try:
gid = self.keycloak_admin.get_group_by_path(
path=path, search_in_subgroups=False
)["id"]
except:
self.keycloak_admin.create_group({"name": group})
gid = self.keycloak_admin.get_group_by_path(path)["id"]
self.keycloak_admin.group_user_add(uid, gid)
return uid
def update_user_pwd(self, user_id, password, temporary=True):
# Updates
payload = {
"credentials": [
{"type": "password", "value": password, "temporary": temporary}
]
}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_update(self, user_id, enabled, email, first, last, groups=[], roles=[]):
## NOTE: Roles didn't seem to be updated/added. Also not confident with groups
# Updates
payload = {
"enabled": enabled,
"email": email,
"firstName": first,
"lastName": last,
"groups": groups,
"realmRoles": roles,
}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_enable(self, user_id):
payload = {"enabled": True}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def user_disable(self, user_id):
payload = {"enabled": False}
self.connect()
return self.keycloak_admin.update_user(user_id, payload)
def group_user_remove(self, user_id, group_id):
self.connect()
return self.keycloak_admin.group_user_remove(user_id, group_id)
# def add_user_role(self,user_id,role_id):
# self.connect()
# return self.keycloak_admin.assign_role(client_id=client_id, user_id=user_id, role_id=role_id, role_name="test")
def remove_user_realm_roles(self, user_id, roles):
self.connect()
roles = [
r
for r in self.get_user_realm_roles(user_id)
if r["name"] in ["admin", "manager", "teacher", "student"]
]
return self.keycloak_admin.delete_realm_roles_of_user(user_id, roles)
def delete_user(self, userid):
self.connect()
return self.keycloak_admin.delete_user(user_id=userid)
def get_user_groups(self, userid):
self.connect()
return self.keycloak_admin.get_user_groups(user_id=userid)
def get_user_realm_roles(self, userid):
self.connect()
return self.keycloak_admin.get_realm_roles_of_user(user_id=userid)
def add_user_client_role(self, client_id, user_id, role_id, role_name):
self.connect()
return self.keycloak_admin.assign_client_role(
client_id=client_id, user_id=user_id, role_id=role_id, role_name="test"
)
## GROUPS
def get_all_groups(self):
## RETURNS ONLY MAIN GROUPS WITH NESTED subGroups list
self.connect()
return self.keycloak_admin.get_groups()
def get_recursive_groups(self, l_groups, l=[]):
for d_group in l_groups:
d = {}
for key, value in d_group.items():
if key == "subGroups":
self.get_recursive_groups(value, l)
else:
d[key] = value
l.append(d)
return l
def get_groups(self, with_subgroups=True):
## RETURNS ALL GROUPS in root list
self.connect()
groups = self.keycloak_admin.get_groups()
return self.get_recursive_groups(groups)
subgroups = []
subgroups1 = []
# This needs to be recursive function
if with_subgroups:
for group in groups:
if len(group["subGroups"]):
for sg in group["subGroups"]:
subgroups.append(sg)
# for sgroup in subgroups:
# if len(sgroup['subGroups']):
# for sg1 in sgroup['subGroups']:
# subgroups1.append(sg1)
return groups + subgroups + subgroups1
def get_group_by_id(self, group_id):
self.connect()
return self.keycloak_admin.get_group(group_id=group_id)
def get_group_by_path(self, path, recursive=True):
self.connect()
return self.keycloak_admin.get_group_by_path(
path=path, search_in_subgroups=recursive
)
def add_group(self, name, parent=None, skip_exists=False):
self.connect()
if parent != None:
parent = self.get_group_by_path(parent)["id"]
return self.keycloak_admin.create_group({"name": name}, parent=parent)
def delete_group(self, group_id):
self.connect()
return self.keycloak_admin.delete_group(group_id=group_id)
def group_user_add(self, user_id, group_id):
self.connect()
return self.keycloak_admin.group_user_add(user_id, group_id)
def add_group_tree(self, path):
parts = path.split("/")
parent_path = "/"
for i in range(1, len(parts)):
if i == 1:
try:
self.add_group(parts[i], None, skip_exists=True)
except:
log.warning("KEYCLOAK: Group :" + parts[i] + " already exists.")
parent_path = parent_path + parts[i]
else:
try:
self.add_group(parts[i], parent_path, skip_exists=True)
except:
log.warning("KEYCLOAK: Group :" + parts[i] + " already exists.")
parent_path = parent_path + parts[i]
# parts=path.split('/')
# parent_path=None
# for i in range(1,len(parts)):
# # print('Adding group name '+parts[i]+' with parent path '+str(parent_path))
# try:
# self.add_group(parts[i],parent_path,skip_exists=True)
# except:
# if parent_path==None:
# parent_path='/'+parts[i]
# else:
# parent_path=self.get_group_by_path(parent_path)['path']
# parent_path=parent_path+'/'+parts[i]
# continue
# if parent_path==None:
# parent_path='/'+parts[i]
# else:
# parent_path=parent_path+'/'+parts[i]
# try:
# if i == 1: parent_id=self.add_group(parts[i])
# except:
# # Main already exists?? What a fail!
# parent_id=self.get_group(parent_id)['id']
# continue
# self.add_group(parts[i],parent_id)
def add_user_with_groups_and_role(
self, username, first, last, email, password, role, groups
):
## Add user
uid = self.add_user(username, first, last, email, password)
## Add user to role
log.info("User uid: " + str(uid) + " role: " + str(role))
try:
therole = role[0]
except:
therole = ""
log.info(self.assign_realm_roles(uid, role))
## Create groups in user
for g in groups:
log.warning("Creating keycloak group: " + g)
parts = g.split("/")
parent_path = None
for i in range(1, len(parts)):
# parent_id=None if parent_path==None else self.get_group(parent_path)['id']
try:
self.add_group(parts[i], parent_path, skip_exists=True)
except:
log.warning(
"Group "
+ str(parent_path)
+ " already exists. Skipping creation"
)
pass
if parent_path is None:
thepath = "/" + parts[i]
else:
thepath = parent_path + "/" + parts[i]
if thepath == "/":
log.warning(
"Not adding the user "
+ username
+ " to any group as does not have any..."
)
continue
gid = self.get_group_by_path(path=thepath)["id"]
log.warning(
"Adding "
+ username
+ " with uuid: "
+ uid
+ " to group "
+ g
+ " with uuid: "
+ gid
)
self.keycloak_admin.group_user_add(uid, gid)
if parent_path == None:
parent_path = ""
parent_path = parent_path + "/" + parts[i]
# self.group_user_add(uid,gid)
## ROLES
def get_roles(self):
self.connect()
return self.keycloak_admin.get_realm_roles()
def get_role(self, name):
self.connect()
return self.keycloak_admin.get_realm_role(name)
def add_role(self, name, description=""):
self.connect()
return self.keycloak_admin.create_realm_role(
{"name": name, "description": description}
)
def delete_role(self, name):
self.connect()
return self.keycloak_admin.delete_realm_role(name)
## CLIENTS
def get_client_roles(self, client_id):
self.connect()
return self.keycloak_admin.get_client_roles(client_id=client_id)
def add_client_role(self, client_id, name, description=""):
self.connect()
return self.keycloak_admin.create_client_role(
client_id, {"name": name, "description": description, "clientRole": True}
)
## SYSTEM
def get_server_info(self):
self.connect()
return self.keycloak_admin.get_server_info()
def get_server_clients(self):
self.connect()
return self.keycloak_admin.get_clients()
def get_server_rsa_key(self):
self.connect()
rsa_key = [
k for k in self.keycloak_admin.get_keys()["keys"] if k["type"] == "RSA"
][0]
return {"name": rsa_key["kid"], "certificate": rsa_key["certificate"]}
## REALM
def assign_realm_roles(self, user_id, role):
self.connect()
try:
role = [
r for r in self.keycloak_admin.get_realm_roles() if r["name"] == role
]
except:
return False
return self.keycloak_admin.assign_realm_roles(user_id=user_id, roles=role)
# return self.keycloak_admin.assign_realm_roles(user_id=user_id, client_id=None, roles=role)
## CLIENTS
def delete_client(self, clientid):
self.connect()
return self.keycloak_admin.delete_client(clientid)
def add_client(self, client):
self.connect()
return self.keycloak_admin.create_client(client)

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
# coding=utf-8
import json
import logging as log
import time
import traceback
from datetime import datetime, timedelta
import mysql.connector
import yaml
# from admin import app
class Mysql:
def __init__(self, host, database, user, password):
self.conn = mysql.connector.connect(
host=host, database=database, user=user, password=password
)
def select(self, sql):
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
self.cur.close()
return data
def update(self, sql):
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.conn.commit()
self.cur.close()

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
# coding=utf-8
import json
import logging as log
import time
import traceback
from datetime import datetime, timedelta
import psycopg2
import yaml
# from admin import app
class Postgres:
def __init__(self, host, database, user, password):
self.conn = psycopg2.connect(
host=host, database=database, user=user, password=password
)
# def __del__(self):
# self.cur.close()
# self.conn.close()
def select(self, sql):
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
self.cur.close()
return data
def update(self, sql):
self.cur = self.conn.cursor()
self.cur.execute(sql)
self.conn.commit()
self.cur.close()
# return self.cur.fetchall()
def select_with_headers(self, sql):
self.cur = self.conn.cursor()
self.cur.execute(sql)
data = self.cur.fetchall()
fields = [a.name for a in self.cur.description]
self.cur.close()
return (fields, data)
# def update_moodle_saml_plugin(self):
# plugin[('idpmetadata', '<md:EntitiesDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" Name="urn:keycloak"><md:EntityDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" entityID="https://sso.'+app.config['DOMAIN']+'/auth/realms/master"><md:IDPSSODescriptor WantAuthnRequestsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><md:KeyDescriptor use="signing"><ds:KeyInfo><ds:KeyName>NrtA5ynG0htowP3SXw7dBJRIAMxn-1PwuuXwOwNhlRw</ds:KeyName><ds:X509Data><ds:X509Certificate>MIICmzCCAYMCBgF5jb0RCTANBgkqhkiG9w0BAQsFADARMQ8wDQYDVQQDDAZtYXN0ZXIwHhcNMjEwNTIxMDcwMjI4WhcNMzEwNTIxMDcwNDA4WjARMQ8wDQYDVQQDDAZtYXN0ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCI8xh/C0+frz3kgWiUbziTDls71R2YiXLSVE+bw7gbEgZUGCLhoEI679azMtIxmnzM/snIX+yTb12+XoYkgbiLTMPQfnH+Kiab6g3HL3KPfhqS+yWkFxOoCp6Ibmp7yPlVWuHH+MBfO8OBr/r8Ao7heFbuzjiLd1KG67rcoaxfDgMuBoEomg1bgEjFgHaQIrSC6OZzH0h987/arqufZXeXlfyiqScMPUi+u5IpDWSwz06UKP0k8mxzNSlpZ93CKOUSsV0SMLxqg7FQ3SGiOk577bGW9o9BDTkkmSo3Up6smc0LzwvvUwuNd0B1irGkWZFQN9OXJnJYf1InEebIMtmPAgMBAAEwDQYJKoZIhvcNAQELBQADggEBADM34+qEGeBQ22luphVTuVJtGxcbxLx7DfsT0QfJD/OuxTTbNAa1VRyarb5juIAkqdj4y2quZna9ZXLecVo4RkwpzPoKoAkYA8b+kHnWqEwJi9iPrDvKb+GR0bBkLPN49YxIZ8IdKX/PRa3yuLHe+loiNsCaS/2ZK2KO46COsqU4QX1iVhF9kWphNLybjNAX45B6cJLsa1g0vXLdm3kv3SB4I2fErFVaOoDtFIjttoYlXdpUiThkPXBfr7N67P3dZHaS4tjJh+IZ8I6TINpcsH8dBkUhzYEIPHCePwSiC1w6WDBLNDuKt1mj1CZrLq+1x+Yhrs+QNRheEKGi89HZ8N0=</ds:X509Certificate></ds:X509Data></ds:KeyInfo></md:KeyDescriptor><md:ArtifactResolutionService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml/resolve" index="0"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:persistent</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:transient</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified</md:NameIDFormat><md:NameIDFormat>urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress</md:NameIDFormat><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:SOAP" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/><md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" Location="https://sso.mydomain.duckdns.org/auth/realms/master/protocol/saml"/></md:IDPSSODescriptor></md:EntityDescriptor></md:EntitiesDescriptor>')]
# pg_update = """UPDATE mdl_config_plugins set title = %s where plugin = auth_saml2 and name ="""
# cursor.execute(pg_update, (title, bookid))
# connection.commit()
# count = cursor.rowcount
# print(count, "Successfully Updated!")

View File

@ -13,8 +13,8 @@ from datetime import datetime, timedelta
import psycopg2
import yaml
from admin.lib.keycloak_client import KeycloakClient
from admin.lib.postgres import Postgres
from lib.keycloak_client import KeycloakClient
from lib.postgres import Postgres
app = {}
app["config"] = {}
@ -51,12 +51,19 @@ class MoodleSaml:
time.sleep(2)
log.info("Got moodle site identifier.")
basepath = os.path.dirname(__file__)
ready = False
while not ready:
try:
with open(
os.path.join(
"./moodledata/saml2/moodle." + os.environ["DOMAIN"] + ".crt"
os.path.abspath(
os.path.join(
basepath,
"../moodledata/saml2/moodle."
+ os.environ["DOMAIN"]
+ ".crt",
)
),
"r",
) as crt:
@ -73,8 +80,13 @@ class MoodleSaml:
while not ready:
try:
with open(
os.path.join(
"./moodledata/saml2/moodle." + os.environ["DOMAIN"] + ".pem"
os.path.abspath(
os.path.join(
basepath,
"../moodledata/saml2/moodle."
+ os.environ["DOMAIN"]
+ ".pem",
)
),
"r",
) as pem:
@ -95,7 +107,12 @@ class MoodleSaml:
# with open(os.path.join("./moodledata/saml2/"+os.environ['MOODLE_SAML_PRIVATEKEYPASS'].replace("moodle."+os.environ['DOMAIN'],'')+'.idp.xml'),"w") as xml:
# xml.write(self.parse_idp_metadata())
with open(
os.path.join("./moodledata/saml2/0f635d0e0f3874fff8b581c132e6c7a7.idp.xml"),
os.path.abspath(
os.path.join(
basepath,
"../moodledata/saml2/0f635d0e0f3874fff8b581c132e6c7a7.idp.xml",
)
),
"w",
) as xml:
xml.write(self.parse_idp_metadata())

View File

@ -13,8 +13,8 @@ from datetime import datetime, timedelta
import psycopg2
import yaml
from admin.lib.keycloak_client import KeycloakClient
from admin.lib.postgres import Postgres
from lib.keycloak_client import KeycloakClient
from lib.postgres import Postgres
app = {}
app["config"] = {}
@ -43,10 +43,17 @@ class NextcloudSaml:
time.sleep(2)
log.info("Connected to nextcloud database.")
basepath = os.path.dirname(__file__)
ready = False
while not ready:
try:
with open(os.path.join("./saml_certs/public.cert"), "r") as crt:
with open(
os.path.abspath(
os.path.join(basepath, "../saml_certs/public.cert")
),
"r",
) as crt:
app["config"]["PUBLIC_CERT"] = crt.read()
ready = True
except IOError:
@ -64,7 +71,12 @@ class NextcloudSaml:
ready = False
while not ready:
try:
with open(os.path.join("./saml_certs/private.key"), "r") as pem:
with open(
os.path.abspath(
os.path.join(basepath, "../saml_certs/private.key")
),
"r",
) as pem:
app["config"]["PRIVATE_KEY"] = pem.read()
ready = True
except IOError:

View File

@ -13,8 +13,8 @@ from datetime import datetime, timedelta
import psycopg2
import yaml
from admin.lib.keycloak_client import KeycloakClient
from admin.lib.mysql import Mysql
from lib.keycloak_client import KeycloakClient
from lib.mysql import Mysql
app = {}
app["config"] = {}
@ -43,10 +43,17 @@ class WordpressSaml:
time.sleep(2)
log.info("Connected to wordpress database.")
basepath = os.path.dirname(__file__)
ready = False
while not ready:
try:
with open(os.path.join("./saml_certs/public.cert"), "r") as crt:
with open(
os.path.abspath(
os.path.join(basepath, "../saml_certs/public.cert")
),
"r",
) as crt:
app["config"]["PUBLIC_CERT_RAW"] = crt.read()
app["config"]["PUBLIC_CERT"] = self.cert_prepare(
app["config"]["PUBLIC_CERT_RAW"]
@ -67,7 +74,12 @@ class WordpressSaml:
ready = False
while not ready:
try:
with open(os.path.join("./saml_certs/private.key"), "r") as pem:
with open(
os.path.abspath(
os.path.join(basepath, "../saml_certs/private.key")
),
"r",
) as pem:
app["config"]["PRIVATE_KEY"] = self.cert_prepare(pem.read())
ready = True
except IOError: