# # Copyright © 2021,2022 IsardVDI S.L. # # This file is part of DD # # DD is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # DD is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with DD. If not, see . # # SPDX-License-Identifier: AGPL-3.0-or-later import json import os import secrets import time import traceback from datetime import datetime, timedelta from pprint import pprint import requests from jose import jwt ## SETUP domain = f"admin.{ os.environ['DOMAIN'] }" secret = os.environ['API_SECRET'] ## END SETUP auths = {} dbconn = None base = "https://" + domain + "/ddapi" raw_jwt_data = { "exp": datetime.utcnow() + timedelta(minutes=5), "kid": "test", } admin_jwt = jwt.encode(raw_jwt_data, secret, algorithm="HS256") jwt = {"Authorization": "Bearer " + admin_jwt} ####################################################################################################################### print(" ----- USUARIS AL SISTEMA") response = requests.get( base + "/users", headers=jwt, verify=True, ) print("METHOD: GET, URL: " + base + "/users, STATUS_CODE:" + str(response.status_code)) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- USUARIS AL SISTEMA QUE CONTENEN UN TEXT") data = {"text": "alu"} response = requests.post( base + "/users/filter", json=data, headers=jwt, verify=True, ) print( "METHOD: POST, URL: " + base + "/users/filter, STATUS_CODE:" + str(response.status_code) + ", POST DATA:" ) pprint(data) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- GRUPS AL SISTEMA") response = requests.get( base + "/groups", headers=jwt, verify=True, ) print("METHOD: GET, URL: " + base + "/groups, STATUS_CODE:" + str(response.status_code)) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- USUARIS DEL GRUP") data = {"id": "test00.classeB"} response = requests.post( base + "/group/users", json=data, headers=jwt, verify=True, ) print( "METHOD: POST, URL: " + base + "/group/users, STATUS_CODE:" + str(response.status_code) + ", POST DATA:" ) pprint(data) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- ROLS AL SISTEMA") response = requests.get( base + "/roles", headers=jwt, verify=True, ) print("METHOD: GET, URL: " + base + "/roles, STATUS_CODE:" + str(response.status_code)) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- USUARIS DEL ROL") data = {"id": "teacher"} response = requests.post( base + "/role/users", json=data, headers=jwt, verify=True, ) print( "METHOD: POST, URL: " + base + "/role/users, STATUS_CODE:" + str(response.status_code) + ", POST DATA:" ) pprint(data) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)[:2]) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) print("\nXXXXXXXXXXXXXXXXX ACTIONS ON USER XXXXXXXXXXXXXXXXXXXXXX\n") ####################################################################################################################### print(" ----- GET USER") response = requests.get( base + "/user/nou.usuari", headers=jwt, verify=True, ) print( "METHOD: GET, URL: " + base + "/user/nou.usuari, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- DELETE USER") response = requests.delete( base + "/user/nou.usuari", headers=jwt, verify=True, ) print( "METHOD: DELETE, URL: " + base + "/user/nou.usuari, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- POST NEW USER") user = { "username": "nou.usuari", "first": "Nou", "last": "Usuari", "email": "nou.usuari@nodns.com", "password": "1n2n3n4n5n6", "quota": "default", "enabled": True, "role": "student", "groups": ["test00.classeB"], } response = requests.post( base + "/user", json=user, headers=jwt, verify=True, ) print( "METHOD: POST, URL: " + base + "/user, STATUS_CODE:" + str(response.status_code) + ", POST DATA:" ) pprint(user) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- UPDATE USER") update_user = { "id": "nou.usuari", "email": "nou.usuari@nodns.com", "enabled": True, "first": "Antic", "groups": ["test00.classeB"], "last": "Usuari", "quota": "default", "quota_used_bytes": "0 MB", "role": "teacher", } response = requests.put( base + "/user/nou.usuari", json=update_user, headers=jwt, verify=True, ) print( "METHOD: PUT, URL: " + base + "/user/nou.usuari, STATUS_CODE:" + str(response.status_code) + ", PUT DATA:" ) pprint(update_user) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- GET USER") response = requests.get( base + "/user/nou.usuari", headers=jwt, verify=True, ) print( "METHOD: GET, URL: " + base + "/user/nou.usuari, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- DELETE USER") response = requests.delete( base + "/user/nou.usuari", headers=jwt, verify=True, ) print( "METHOD: DELETE, URL: " + base + "/user/nou.usuari, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) print("\nXXXXXXXXXXXXXXXXX ACTIONS ON GROUP XXXXXXXXXXXXXXXXXXXXXX\n") ####################################################################################################################### print(" ----- GET GROUP") response = requests.get( base + "/group/teacher", headers=jwt, verify=True, ) print( "METHOD: GET, URL: " + base + "/group/teacher, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- POST NEW GROUP") group = {"name": "test"} response = requests.post( base + "/group", json=group, headers=jwt, verify=True, ) print( "METHOD: POST, URL: " + base + "/group, STATUS_CODE:" + str(response.status_code) + ", POST DATA:" ) pprint(group) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] ) ####################################################################################################################### print(" ----- DELETE GROUP") response = requests.delete( base + "/group/test", headers=jwt, verify=True, ) print( "METHOD: DELETE, URL: " + base + "/group/test, STATUS_CODE:" + str(response.status_code) ) if response.status_code == 200: print("RESPONSE:") pprint(json.loads(response.text)) else: print( "ERROR: " + json.loads(response.text)["error"] + " DESCRIPTION: " + json.loads(response.text)["description"] )