Merge branch 'api-jwt' into 'master'

Api jwt

See merge request isard/isard-sso!78
Josep Maria Viñolas Auquer 2022-06-02 10:39:45 +00:00
commit 0e550746d6
22 changed files with 1374 additions and 548 deletions

View File

@ -3,7 +3,6 @@ Flask-Login==0.5.0
eventlet==0.33.0
Flask-SocketIO==5.1.0
bcrypt==3.2.0
diceware==0.9.6
mysql-connector-python==8.0.25
psycopg2==2.8.6
@ -12,5 +11,6 @@ minio==7.0.3
urllib3==1.26.6
schema==0.7.5
Werkzeug~=2.0.0
# Unused yet
#flask-oidc==1.4.0
python-jose==3.3.0
Cerberus==1.3.4
PyYAML==6.0

View File

@ -108,4 +108,4 @@ def send_custom(path):
"""
Import all views
"""
from .views import ApiViews, InternalViews, LoginViews, WebViews
from .views import ApiViews, AppViews, LoginViews, WebViews, WpViews

View File

@ -0,0 +1,99 @@
# Copyright 2017 the Isard-vdi project authors:
# Josep Maria Viñolas Auquer
# Alberto Larraz Dalmases
# License: AGPLv3
import json
import logging as log
import os
import traceback
from functools import wraps
from flask import request
from jose import jwt
from admin import app
from ..lib.api_exceptions import Error
def get_header_jwt_payload():
return get_token_payload(get_token_auth_header())
def get_token_header(header):
"""Obtains the Access Token from the a Header"""
auth = request.headers.get(header, None)
if not auth:
raise Error(
"unauthorized",
"Authorization header is expected",
traceback.format_stack(),
)
parts = auth.split()
if parts[0].lower() != "bearer":
raise Error(
"unauthorized",
"Authorization header must start with Bearer",
traceback.format_stack(),
)
elif len(parts) == 1:
raise Error("bad_request", "Token not found")
elif len(parts) > 2:
raise Error(
"unauthorized",
"Authorization header must be Bearer token",
traceback.format_stack(),
)
return parts[1] # Token
def get_token_auth_header():
return get_token_header("Authorization")
def get_token_payload(token):
# log.warning("The received token in get_token_payload is: " + str(token))
try:
claims = jwt.get_unverified_claims(token)
secret = app.config["API_SECRET"]
except:
log.warning(
"JWT token with invalid parameters. Can not parse it.: " + str(token)
)
raise Error(
"unauthorized",
"Unable to parse authentication parameters token.",
traceback.format_stack(),
)
try:
payload = jwt.decode(
token,
secret,
algorithms=["HS256"],
options=dict(verify_aud=False, verify_sub=False, verify_exp=True),
)
except jwt.ExpiredSignatureError:
log.warning("Token expired")
raise Error("unauthorized", "Token is expired", traceback.format_stack())
except jwt.JWTClaimsError:
raise Error(
"unauthorized",
"Incorrect claims, please check the audience and issuer",
traceback.format_stack(),
)
except Exception:
raise Error(
"unauthorized",
"Unable to parse authentication token.",
traceback.format_stack(),
)
if payload.get("data", False):
return payload["data"]
return payload

View File

@ -30,7 +30,8 @@ options.num = 3
import secrets
from .events import Events
from .api_exceptions import Error
from .events import Events, sio_event_send
from .exceptions import UserExists, UserNotFound
from .helpers import (
count_repeated,
@ -161,7 +162,7 @@ class Admin:
ddmail,
ddpassword,
group="admin",
temporary=False,
password_temporary=False,
)
self.keycloak.assign_realm_roles(uid, "admin")
log.warning("KEYCLOAK: OK")
@ -395,6 +396,7 @@ class Admin:
# return users_list
def get_mix_users(self):
sio_event_send("get_users", {"you_win": "you got the users!"})
return self.internal["users"]
def _get_mix_users(self):
@ -466,6 +468,10 @@ class Admin:
def _get_roles(self):
return filter_roles_listofdicts(self.keycloak.get_roles())
def get_group_by_name(self, group_name):
group = [g for g in self.internal["groups"] if g["name"] == group_name]
return group[0] if len(group) else False
def get_keycloak_groups(self):
log.warning("Loading keycloak groups...")
return self.keycloak.get_groups()
@ -627,7 +633,7 @@ class Admin:
"gids": pathslist,
"quota": u["quota"],
"roles": [u["role"].strip()],
"temporary": True
"password_temporary": True
if u["password_temporal"].lower() == "yes"
else False,
"password": self.get_dice_pwd()
@ -798,7 +804,7 @@ class Admin:
u["last"],
u["email"],
u["password"],
temporary=u["temporary"],
password_temporary=u["password_temporary"],
)
self.av.add_user_default_avatar(uid, u["roles"][0])
# Add user to role and group rolename
@ -1291,8 +1297,8 @@ class Admin:
externaluser["gids"].append(data["action"])
return True
def user_update_password(self, userid, password, temporary):
return self.keycloak.update_user_pwd(userid, password, temporary)
def user_update_password(self, userid, password, password_temporary):
return self.keycloak.update_user_pwd(userid, password, password_temporary)
def update_users_from_keycloak(self):
kgroups = self.keycloak.get_groups()
@ -1669,6 +1675,7 @@ class Admin:
ev.update_text("Syncing data from applications...")
self.resync_data()
ev.update_text("User deleted")
sio_event_send("delete_user", {"userid": userid})
return True
def get_user(self, userid):
@ -1695,6 +1702,22 @@ class Admin:
pathpart = pathpart + "." + part
pathslist.append(pathpart)
for path in pathslist:
path = "/" + path.replace(".", "/")
log.warning(
" KEYCLOAK USERS: Assign user " + u["username"] + " to group " + path
)
try:
gid = self.keycloak.get_group_by_path(path=path)["id"]
except:
return False
# gid = self.keycloak.add_group_tree(path)
# log.warning("THE PATH "+str(path)+" HAS GID "+str(gid))
# self.moodle.add_system_cohort(path)
# self.nextcloud.add_group(path)
# self.resync_data()
# gid = self.keycloak.get_group_by_path(path=path)["id"]
### KEYCLOAK
#######################
ev = Events("Add user", u["username"], total=5)
@ -1706,18 +1729,14 @@ class Admin:
u["email"],
u["password"],
enabled=u["enabled"],
password_temporary=u.get("password_temporary", True),
)
self.av.add_user_default_avatar(uid, u["role"])
# Add user to role and group rolename
log.warning(
" KEYCLOAK USERS: Assign user "
+ u["username"]
+ " with initial pwd "
+ u["password"]
+ " to role "
+ u["role"]
" KEYCLOAK USERS: Assign user " + u["username"] + " to role " + u["role"]
)
self.keycloak.assign_realm_roles(uid, u["role"])
gid = self.keycloak.get_group_by_path(path="/" + u["role"])["id"]
@ -1726,12 +1745,9 @@ class Admin:
# Add user to groups
for path in pathslist:
path = "/" + path.replace(".", "/")
log.warning(
" KEYCLOAK USERS: Assign user " + u["username"] + " to group " + path
)
gid = self.keycloak.get_group_by_path(path=path)["id"]
self.keycloak.group_user_add(uid, gid)
ev.increment({"name": "Added to system groups", "data": []})
ev.increment({"name": "Added to system groups", "data": []})
pathslist.append(u["role"])
### MOODLE
@ -1812,6 +1828,8 @@ class Admin:
log.error(traceback.format_exc())
self.resync_data()
sio_event_send("new_user", u)
return uid
def add_group(self, g):
# TODO: Check if exists
@ -1830,6 +1848,7 @@ class Admin:
self.moodle.add_system_cohort(new_path, description=g["description"])
self.nextcloud.add_group(new_path)
self.resync_data()
return new_path
def delete_group_by_id(self, group_id):
ev = Events("Deleting group", "Deleting from keycloak")
@ -1843,6 +1862,7 @@ class Admin:
+ str(group_id)
+ " as it does not exist!"
)
raise Error("not_found", "Group " + group_id + " not found.")
# {'id': '966ad67c-499a-4f56-bd1d-283691cde0e7', 'name': 'asdgfewfwe', 'path': '/asdgfewfwe', 'attributes': {}, 'realmRoles': [], 'clientRoles': {}, 'subGroups': [], 'access': {'view': True, 'manage': True, 'manageMembership': True}}
@ -1886,3 +1906,6 @@ class Admin:
self.moodle.delete_cohorts(cohort)
self.nextcloud.delete_group(gid)
self.resync_data()
def set_nextcloud_user_mail(self, data):
self.nextcloud.set_user_mail(data)

View File

@ -0,0 +1,143 @@
import inspect
import json
import logging as log
import os
import traceback
from flask import jsonify, request
from admin import app
content_type = {"Content-Type": "application/json"}
ex = {
"bad_request": {
"error": {
"error": "bad_request",
"msg": "Bad request",
},
"status_code": 400,
},
"unauthorized": {
"error": {
"error": "unauthorized",
"msg": "Unauthorized",
},
"status_code": 401,
},
"forbidden": {
"error": {
"error": "forbidden",
"msg": "Forbidden",
},
"status_code": 403,
},
"not_found": {
"error": {
"error": "not_found",
"msg": "Not found",
},
"status_code": 404,
},
"conflict": {
"error": {
"error": "conflict",
"msg": "Conflict",
},
"status_code": 409,
},
"internal_server": {
"error": {
"error": "internal_server",
"msg": "Internal server error",
},
"status_code": 500,
},
"gateway_timeout": {
"error": {
"error": "gateway_timeout",
"msg": "Gateway timeout",
},
"status_code": 504,
},
"precondition_required": {
"error": {
"error": "precondition_required",
"msg": "Precondition required",
},
"status_code": 428,
},
"insufficient_storage": {
"error": {
"error": "insufficient_storage",
"msg": "Insufficient storage",
},
"status_code": 507,
},
}
class Error(Exception):
def __init__(self, error="bad_request", description="", debug="", data=None):
self.error = ex[error]["error"].copy()
self.error["function"] = (
inspect.stack()[1][1].split(os.sep)[-1]
+ ":"
+ str(inspect.stack()[1][2])
+ ":"
+ inspect.stack()[1][3]
)
self.error["function_call"] = (
inspect.stack()[2][1].split(os.sep)[-1]
+ ":"
+ str(inspect.stack()[2][2])
+ ":"
+ inspect.stack()[2][3]
)
self.error["description"] = str(description)
self.error["debug"] = "{}\n\r{}{}".format(
"----------- DEBUG START -------------",
debug,
"----------- DEBUG STOP -------------",
)
self.error["request"] = (
"{}\n{}\r\n{}\r\n\r\n{}{}".format(
"----------- REQUEST START -----------",
request.method + " " + request.url,
"\r\n".join("{}: {}".format(k, v) for k, v in request.headers.items()),
request.body if hasattr(request, "body") else "",
"----------- REQUEST STOP -----------",
)
if request
else ""
)
self.error["data"] = (
"{}\n{}\n{}".format(
"----------- DATA START -----------",
json.dumps(data, indent=2),
"----------- DATA STOP -----------",
)
if data
else ""
)
self.status_code = ex[error]["status_code"]
self.content_type = content_type
log.debug(
"%s - %s - [%s -> %s]\r\n%s\r\n%s\r\n%s"
% (
error,
str(description),
self.error["function_call"],
self.error["function"],
self.error["debug"],
self.error["request"],
self.error["data"],
)
)
@app.errorhandler(Error)
def handle_user_error(ex):
response = jsonify(ex.error)
response.status_code = ex.status_code
response.headers = {"content-type": content_type}
return response

View File

@ -24,6 +24,16 @@ from flask_socketio import (
from admin import app
def sio_event_send(event, data):
app.socketio.emit(
event,
json.dumps(data),
namespace="/sio/events",
room="events",
)
sleep(0.001)
class Events:
def __init__(self, title, text="", total=0, table=False, type="info"):
# notice, info, success, and error

View File

@ -12,6 +12,7 @@ import yaml
from jinja2 import Environment, FileSystemLoader
from keycloak import KeycloakAdmin
from .api_exceptions import Error
from .helpers import get_recursive_groups, kpath2kpaths
from .postgres import Postgres
@ -152,7 +153,7 @@ class KeycloakClient:
email,
password,
group=False,
temporary=True,
password_temporary=True,
enabled=True,
):
# RETURNS string with keycloak user id (the main id in this app)
@ -167,12 +168,20 @@ class KeycloakClient:
"firstName": first,
"lastName": last,
"credentials": [
{"type": "password", "value": password, "temporary": temporary}
{
"type": "password",
"value": password,
"temporary": password_temporary,
}
],
}
)
except:
except Exception as e:
log.error(traceback.format_exc())
raise Error(
"conflict",
"user/email already exists: " + str(username) + "/" + str(email),
)
if group:
path = "/" + group if group[1:] != "/" else group
@ -186,11 +195,11 @@ class KeycloakClient:
self.keycloak_admin.group_user_add(uid, gid)
return uid
def update_user_pwd(self, user_id, password, temporary=True):
def update_user_pwd(self, user_id, password, password_temporary=True):
# Updates
payload = {
"credentials": [
{"type": "password", "value": password, "temporary": temporary}
{"type": "password", "value": password, "temporary": password_temporary}
]
}
self.connect()

View File

@ -3,6 +3,12 @@ import os
import traceback
from admin import app
from pprint import pprint
from minio import Minio
from minio.commonconfig import REPLACE, CopySource
from minio.deleteobjects import DeleteObject
from requests import get, post
legal_path= os.path.join(app.root_path, "static/templates/pages/legal/")

View File

@ -6,9 +6,45 @@ import os
import sys
import traceback
import yaml
from cerberus import Validator, rules_set_registry, schema_registry
from admin import app
class AdminValidator(Validator):
None
# def _normalize_default_setter_genid(self, document):
# return _parse_string(document["name"])
# def _normalize_default_setter_genidlower(self, document):
# return _parse_string(document["name"]).lower()
# def _normalize_default_setter_gengroupid(self, document):
# return _parse_string(
# document["parent_category"] + "-" + document["uid"]
# ).lower()
def load_validators(purge_unknown=True):
validators = {}
schema_path = os.path.join(app.root_path, "schemas")
for schema_filename in os.listdir(schema_path):
try:
with open(os.path.join(schema_path, schema_filename)) as file:
schema_yml = file.read()
schema = yaml.load(schema_yml, Loader=yaml.FullLoader)
validators[schema_filename.split(".")[0]] = AdminValidator(
schema, purge_unknown=purge_unknown
)
except IsADirectoryError:
None
return validators
app.validators = load_validators()
class loadConfig:
def __init__(self, app=None):
try:
@ -34,6 +70,7 @@ class loadConfig:
app.config.setdefault(
"VERIFY", True if os.environ["VERIFY"] == "true" else False
)
app.config.setdefault("API_SECRET", os.environ.get("API_SECRET"))
except Exception as e:
log.error(traceback.format_exc())
raise

View File

@ -10,6 +10,7 @@ import traceback
import urllib
import requests
from psycopg2 import sql
# from ..lib.log import *
from admin import app
@ -518,3 +519,47 @@ class Nextcloud:
# 101 - invalid input data
# 102 - group already exists
# 103 - failed to add the group
def set_user_mail(self, data):
query = """SELECT * FROM "oc_mail_accounts" WHERE "email" = '%s'"""
sql_query = sql.SQL(query.format(data["email"]))
if not len(self.nextcloud_pg.select(sql_query)):
query = """INSERT INTO "oc_mail_accounts" ("user_id","name","email","inbound_host","inbound_port","inbound_ssl_mode","inbound_user","inbound_password","outbound_host","outbound_port","outbound_ssl_mode","outbound_user","outbound_password") VALUES
('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');"""
account = [
data["user_id"],
data["name"],
data["email"],
data["inbound_host"],
data["inbound_port"],
data["inbound_ssl_mode"],
data["inbound_user"],
data["inbound_password"],
data["outbound_host"],
data["outbound_port"],
data["outbound_ssl_mode"],
data["outbound_user"],
data["outbound_password"],
]
else:
query = """UPDATE "oc_mail_accounts" SET ("user_id","name","email","inbound_host","inbound_port","inbound_ssl_mode","inbound_user","inbound_password","outbound_host","outbound_port","outbound_ssl_mode","outbound_user","outbound_password") =
('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') WHERE email = '%s';"""
account = [
data["user_id"],
data["name"],
data["email"],
data["inbound_host"],
data["inbound_port"],
data["inbound_ssl_mode"],
data["inbound_user"],
data["inbound_password"],
data["outbound_host"],
data["outbound_port"],
data["outbound_ssl_mode"],
data["outbound_user"],
data["outbound_password"],
data["email"],
]
sql_query = sql.SQL(query.format(",".join([str(acc) for acc in account])))
self.nextcloud_pg.update(sql_query)

View File

@ -0,0 +1,11 @@
name:
required: true
type: string
description:
required: false
type: string
default: "Api created"
parent:
required: false
type: string
default: ""

View File

@ -0,0 +1,34 @@
user_id:
type: string
required: true
name:
type: string
required: false
email:
type: string
required: true
regex: ^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$
inbound_host:
type: string
required: true
inbound_port:
type: integer
required: true
inbound_ssl_mode:
type: string
default: ssl
inbound_user:
type: string
required: true
outbound_host:
type: string
required: true
outbound_port:
type: integer
required: true
outbound_ssl_mode:
type: string
default: ssl
outbound_user:
type: string
required: true

View File

@ -0,0 +1,37 @@
mails:
type: list
schema:
user_id:
type: string
required: true
name:
type: string
required: false
email:
type: string
required: true
regex: ^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$
inbound_host:
type: string
required: true
inbound_port:
type: integer
required: true
inbound_ssl_mode:
type: string
default: ssl
inbound_user:
type: string
required: true
outbound_host:
type: string
required: true
outbound_port:
type: integer
required: true
outbound_ssl_mode:
type: string
default: ssl
outbound_user:
type: string
required: true

View File

@ -0,0 +1,34 @@
username:
required: true
type: string
first:
required: true
type: string
last:
required: true
type: string
email:
required: true
type: string
regex: ^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$
password:
required: true
type: string
password_temporary:
required: false
type: boolean
default: true
quota:
required: true
type: string
enabled:
required: true
type: boolean
role:
required: true
type: string
empty: false
groups:
required: true
type: list

View File

@ -0,0 +1,31 @@
first:
required: false
type: string
last:
required: false
type: string
email:
required: false
type: string
regex: ^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$
password:
required: false
type: string
password_temporary:
required: false
type: boolean
default: true
quota:
required: false
type: string
enabled:
required: false
type: boolean
role:
required: false
type: string
empty: false
groups:
required: false
type: list

View File

@ -1,545 +1,342 @@
#!flask/bin/python
# coding=utf-8
import concurrent.futures
import json
import logging as log
import os
import re
import socket
import sys
# import Queue
import threading
import time
import traceback
from uuid import uuid4
from flask import Response, jsonify, redirect, render_template, request, url_for
from flask_login import current_user, login_required
from flask import request
from admin import app
from ..lib.helpers import system_group
from .decorators import is_admin
threads = {"external": None}
# q = Queue.Queue()
from keycloak.exceptions import KeycloakGetError
from ..lib.dashboard import Dashboard
from ..lib.exceptions import UserExists, UserNotFound
dashboard = Dashboard()
from ..lib.api_exceptions import Error
from .decorators import has_token
@app.route("/sysadmin/api/resync")
@app.route("/api/resync")
@login_required
def resync():
return (
json.dumps(app.admin.resync_data()),
200,
{"Content-Type": "application/json"},
)
## LISTS
@app.route("/ddapi/users", methods=["GET"])
@has_token
def ddapi_users():
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
users = []
for user in sorted_users:
users.append(user_parser(user))
return json.dumps(users), 200, {"Content-Type": "application/json"}
@app.route("/api/users", methods=["GET", "PUT"])
@app.route("/api/users/<provider>", methods=["POST", "PUT", "GET", "DELETE"])
@login_required
def users(provider=False):
if request.method == "DELETE":
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
if provider == "keycloak":
return (
json.dumps(app.admin.delete_keycloak_users()),
200,
{"Content-Type": "application/json"},
)
if provider == "nextcloud":
return (
json.dumps(app.admin.delete_nextcloud_users()),
200,
{"Content-Type": "application/json"},
)
if provider == "moodle":
return (
json.dumps(app.admin.delete_moodle_users()),
200,
{"Content-Type": "application/json"},
)
@app.route("/ddapi/users/filter", methods=["POST"])
@has_token
def ddapi_users_search():
if request.method == "POST":
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
if provider == "moodle":
return (
json.dumps(app.admin.sync_to_moodle()),
200,
{"Content-Type": "application/json"},
)
if provider == "nextcloud":
return (
json.dumps(app.admin.sync_to_nextcloud()),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT" and not provider:
if current_user.role != "admin":
return json.dumps({}), 301, {"Content-Type": "application/json"}
data = request.get_json(force=True)
if not data.get("text"):
raise Error("bad_request", "Incorrect data requested.")
users = app.admin.get_mix_users()
result = [user_parser(user) for user in filter_users(users, data["text"])]
sorted_result = sorted(result, key=lambda k: k["id"])
return json.dumps(sorted_result), 200, {"Content-Type": "application/json"}
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already working with users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.update_users_from_keycloak, args=()
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Add user error."}),
500,
{"Content-Type": "application/json"},
)
# return json.dumps(app.admin.update_users_from_keycloak()), 200, {'Content-Type': 'application/json'}
@app.route("/ddapi/groups", methods=["GET"])
@has_token
def ddapi_groups():
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: k["name"])
groups = []
for group in sorted_groups:
groups.append(group_parser(group))
return json.dumps(groups), 200, {"Content-Type": "application/json"}
users = app.admin.get_mix_users()
if current_user.role != "admin":
for user in users:
user["keycloak_groups"] = [
g for g in user["keycloak_groups"] if not system_group(g)
@app.route("/ddapi/group/users", methods=["POST"])
@has_token
def ddapi_group_users():
if request.method == "POST":
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
if data.get("id"):
group_users = [
user_parser(user)
for user in sorted_users
if data.get("id") in user["keycloak_groups"]
]
return json.dumps(users), 200, {"Content-Type": "application/json"}
@app.route("/api/users_bulk/<action>", methods=["PUT"])
@login_required
def users_bulk(action):
data = request.get_json(force=True)
if request.method == "PUT":
if action == "enable":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
elif data.get("path"):
try:
threads["external"] = threading.Thread(
target=app.admin.enable_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["path"] == data.get("path")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Enable users error."}),
500,
{"Content-Type": "application/json"},
)
if action == "disable":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
raise Error("not_found", "Group path not found in system")
elif data.get("keycloak_id"):
try:
threads["external"] = threading.Thread(
target=app.admin.disable_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
name = [
g["name"]
for g in app.admin.get_mix_groups()
if g["id"] == data.get("keycloak_id")
][0]
group_users = [
user_parser(user)
for user in sorted_users
if name in user["keycloak_groups"]
]
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Disabling users error."}),
500,
{"Content-Type": "application/json"},
)
if action == "delete":
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps(
{"msg": "Precondition failed: already operating users"}
),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.delete_users, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Deleting users error."}),
500,
{"Content-Type": "application/json"},
)
return json.dumps({}), 405, {"Content-Type": "application/json"}
# Update pwd
@app.route("/api/user_password", methods=["GET"])
@app.route("/api/user_password/<userid>", methods=["PUT"])
@login_required
def user_password(userid=False):
if request.method == "GET":
return (
json.dumps(app.admin.get_dice_pwd()),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
data = request.get_json(force=True)
password = data["password"]
temporary = data.get("temporary", True)
try:
res = app.admin.user_update_password(userid, password, temporary)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except KeycloakGetError as e:
log.error(e.error_message.decode("utf-8"))
return (
json.dumps({"msg": "Update password error."}),
500,
{"Content-Type": "application/json"},
)
return json.dumps({}), 405, {"Content-Type": "application/json"}
# User
@app.route("/api/user", methods=["POST"])
@app.route("/api/user/<userid>", methods=["PUT", "GET", "DELETE"])
@login_required
def user(userid=None):
if request.method == "DELETE":
app.admin.delete_user(userid)
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if app.admin.get_user_username(data["username"]):
return (
json.dumps({"msg": "Add user error: already exists."}),
409,
{"Content-Type": "application/json"},
)
data["enabled"] = True if data.get("enabled", False) else False
data["quota"] = data["quota"] if data["quota"] != "false" else False
data["groups"] = data["groups"] if data.get("groups", False) else []
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return (
json.dumps({"msg": "Precondition failed: already adding users"}),
412,
{"Content-Type": "application/json"},
)
else:
threads["external"] = None
try:
threads["external"] = threading.Thread(
target=app.admin.add_user, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
except:
log.error(traceback.format_exc())
return (
json.dumps({"msg": "Add user error."}),
500,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
data = request.get_json(force=True)
data["enabled"] = True if data.get("enabled", False) else False
data["groups"] = data["groups"] if data.get("groups", False) else []
data["roles"] = [data.pop("role-keycloak")]
try:
app.admin.user_update(data)
return json.dumps({}), 200, {"Content-Type": "application/json"}
except UserNotFound:
return (
json.dumps({"msg": "User not found."}),
404,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
pass
if request.method == "GET":
user = app.admin.get_user(userid)
if not user:
return (
json.dumps({"msg": "User not found."}),
404,
{"Content-Type": "application/json"},
)
return json.dumps(user), 200, {"Content-Type": "application/json"}
@app.route("/api/roles")
@login_required
def roles():
sorted_roles = sorted(app.admin.get_roles(), key=lambda k: k["name"])
if current_user.role != "admin":
sorted_roles = [sr for sr in sorted_roles if sr["name"] != "admin"]
return json.dumps(sorted_roles), 200, {"Content-Type": "application/json"}
@app.route("/api/group", methods=["POST", "DELETE"])
@app.route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@login_required
def group(group_id=False):
if request.method == "POST":
data = request.get_json(force=True)
data["parent"] = data["parent"] if data["parent"] != "" else None
return (
json.dumps(app.admin.add_group(data)),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
try:
data = request.get_json(force=True)
except:
data = False
if data:
res = app.admin.delete_group_by_path(data["path"])
raise Error("not_found", "Group keycloak_id not found in system")
else:
res = app.admin.delete_group_by_id(group_id)
return json.dumps(res), 200, {"Content-Type": "application/json"}
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(group_users), 200, {"Content-Type": "application/json"}
@app.route("/api/groups")
@app.route("/api/groups/<provider>", methods=["POST", "PUT", "GET", "DELETE"])
@login_required
def groups(provider=False):
@app.route("/ddapi/roles", methods=["GET"])
@has_token
def ddapi_roles():
if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: str(k["name"]))
if current_user.role != "admin":
## internal groups should be avoided as are assigned with the role
sorted_groups = [sg for sg in sorted_groups if not system_group(sg["name"])]
else:
sorted_groups = [sg for sg in sorted_groups]
return json.dumps(sorted_groups), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
if provider == "keycloak":
return (
json.dumps(app.admin.delete_keycloak_groups()),
200,
{"Content-Type": "application/json"},
)
### SYSADM USERS ONLY
@app.route("/api/external", methods=["POST", "PUT", "GET", "DELETE"])
@login_required
def external():
if "external" in threads.keys():
if threads["external"] is not None and threads["external"].is_alive():
return json.dumps({}), 301, {"Content-Type": "application/json"}
else:
threads["external"] = None
if request.method == "POST":
data = request.get_json(force=True)
if data["format"] == "json-ga":
threads["external"] = threading.Thread(
target=app.admin.upload_json_ga, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
if data["format"] == "csv-ug":
valid = check_upload_errors(data)
if valid["pass"]:
threads["external"] = threading.Thread(
target=app.admin.upload_csv_ug, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
else:
return json.dumps(valid), 422, {"Content-Type": "application/json"}
if request.method == "PUT":
data = request.get_json(force=True)
threads["external"] = threading.Thread(
target=app.admin.sync_external, args=(data,)
)
threads["external"].start()
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
print("RESET")
app.admin.reset_external()
return json.dumps({}), 200, {"Content-Type": "application/json"}
return json.dumps({}), 500, {"Content-Type": "application/json"}
@app.route("/api/external/users")
@login_required
def external_users_list():
while threads["external"] is not None and threads["external"].is_alive():
time.sleep(0.5)
return (
json.dumps(app.admin.get_external_users()),
200,
{"Content-Type": "application/json"},
)
@app.route("/api/external/groups")
@login_required
def external_groups_list():
while threads["external"] is not None and threads["external"].is_alive():
time.sleep(0.5)
return (
json.dumps(app.admin.get_external_groups()),
200,
{"Content-Type": "application/json"},
)
@app.route("/api/external/roles", methods=["PUT"])
@login_required
def external_roles():
if request.method == "PUT":
return (
json.dumps(app.admin.external_roleassign(request.get_json(force=True))),
200,
{"Content-Type": "application/json"},
)
def check_upload_errors(data):
email_regex = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
for u in data["data"]:
try:
user_groups = [g.strip() for g in u["groups"].split(",")]
except:
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid groups: " + u["groups"],
}
log.error(resp)
return resp
if not re.fullmatch(email_regex, u["email"]):
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid email: " + u["email"],
}
log.error(resp)
return resp
if u["role"] not in ["admin", "manager", "teacher", "student"]:
if u["role"] == "":
resp = {
"pass": False,
"msg": "User " + u["username"] + " has no role assigned!",
roles = []
for role in sorted(app.admin.get_roles(), key=lambda k: k["name"]):
log.error(role)
roles.append(
{
"keycloak_id": role["id"],
"id": role["name"],
"name": role["name"],
"description": role.get("description", ""),
}
log.error(resp)
return resp
resp = {
"pass": False,
"msg": "User " + u["username"] + " has invalid role: " + u["role"],
}
log.error(resp)
return resp
return {"pass": True, "msg": ""}
@app.route("/api/dashboard/<item>", methods=["PUT"])
# @login_required
def dashboard_put(item):
if item == "colours":
try:
data = request.get_json(force=True)
dashboard.update_colours(data)
except:
log.error(traceback.format_exc())
return json.dumps({"colours": data}), 200, {"Content-Type": "application/json"}
if item == "menu":
try:
data = request.get_json(force=True)
dashboard.update_menu(data)
except:
log.error(traceback.format_exc())
return json.dumps(data), 200, {"Content-Type": "application/json"}
if item == "logo":
dashboard.update_logo(request.files["croppedImage"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if item == "background":
dashboard.update_background(request.files["croppedImage"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
return (
json.dumps(
{
"error": "update_error",
"msg": "Error updating item " + item + "\n" + traceback.format_exc(),
}
),
500,
{"Content-Type": "application/json"},
)
@app.route("/api/legal/<item>", methods=["GET", "POST"])
# @login_required
def legal_put(item):
if request.method == "GET":
if item == "legal":
lang = request.args.get("lang")
return (
json.dumps({"html": "<b>Legal</b><br>This works! in lang: " + lang}),
200,
{"Content-Type": "application/json"},
)
# if item == "privacy":
# return json.dumps({ "html": "<b>Privacy policy</b><br>This works!"}), 200, {'Content-Type': 'application/json'}
return json.dumps(roles), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/role/users", methods=["POST"])
@has_token
def ddapi_role_users():
if request.method == "POST":
if item == "legal":
data = None
data = request.get_json(force=True)
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
if data.get("id", data.get("name")):
role_users = [
user_parser(user)
for user in sorted_users
if data.get("id", data.get("name")) in user["roles"]
]
elif data.get("keycloak_id"):
try:
data = request.json
html = data["html"]
lang = data["lang"]
id = [
r["id"]
for r in app.admin.get_roles()
if r["id"] == data.get("keycloak_id")
][0]
role_users = [
user_parser(user) for user in sorted_users if id in user["roles"]
]
except:
log.error(traceback.format_exc())
return json.dumps(data), 200, {"Content-Type": "application/json"}
# if item == "privacy":
# data = None
# try:
# data = request.json
# html = data["html"]
# lang = data["lang"]
# except:
# log.error(traceback.format_exc())
# return json.dumps(data), 200, {'Content-Type': 'application/json'}
raise Error("not_found", "Role keycloak_id not found in system")
else:
raise Error("bad_request", "Incorrect data requested.")
return json.dumps(role_users), 200, {"Content-Type": "application/json"}
## INDIVIDUAL ACTIONS
@app.route("/ddapi/user", methods=["POST"])
@app.route("/ddapi/user/<user_ddid>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_user(user_ddid=None):
if request.method == "GET":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
return json.dumps(user_parser(user)), 200, {"Content-Type": "application/json"}
if request.method == "DELETE":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
app.admin.delete_user(user["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["user"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: ",
+str(app.validators["user"].errors),
traceback.format_exc(),
)
if app.admin.get_user_username(data["username"]):
raise Error("conflict", "User id already exists")
data = app.validators["user"].normalized(data)
keycloak_id = app.admin.add_user(data)
if not keycloak_id:
raise Error(
"precondition_required",
"Not all user groups already in system. Please create user groups before adding user.",
)
return (
json.dumps({"keycloak_id": keycloak_id}),
200,
{"Content-Type": "application/json"},
)
if request.method == "PUT":
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
data = request.get_json(force=True)
if not app.validators["user_update"].validate(data):
raise Error(
"bad_request",
"Data validation for user failed: "
+ str(app.validators["user_update"].errors),
traceback.format_exc(),
)
data = {**user, **data}
data = app.validators["user_update"].normalized(data)
data = {**data, **{"username": user_ddid}}
data["roles"] = [data.pop("role")]
data["firstname"] = data.pop("first")
data["lastname"] = data.pop("last")
app.admin.user_update(data)
if data.get("password"):
app.admin.user_update_password(
user["id"], data["password"], data["password_temporary"]
)
return json.dumps({}), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/username/<old_user_ddid>/<new_user_did>", methods=["PUT"])
@has_token
def ddapi_username(old_user_ddid, new_user_did):
user = app.admin.get_user_username(user_ddid)
if not user:
raise Error("not_found", "User id not found")
# user = app.admin.update_user_username(old_user_ddid,new_user_did)
return json.dumps("Not implemented yet!"), 419, {"Content-Type": "application/json"}
@app.route("/ddapi/group", methods=["POST"])
@app.route("/ddapi/group/<id>", methods=["GET", "POST", "DELETE"])
# @app.route("/api/group/<group_id>", methods=["PUT", "GET", "DELETE"])
@has_token
def ddapi_group(id=None):
if request.method == "GET":
group = app.admin.get_group_by_name(id)
if not group:
Error("not found", "Group id not found")
return (
json.dumps(group_parser(group)),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
if not app.validators["group"].validate(data):
raise Error(
"bad_request",
"Data validation for group failed: "
+ str(app.validators["group"].errors),
traceback.format_exc(),
)
data = app.validators["group"].normalized(data)
data["parent"] = data["parent"] if data["parent"] != "" else None
if app.admin.get_group_by_name(id):
raise Error("conflict", "Group id already exists")
path = app.admin.add_group(data)
# log.error(path)
# keycloak_id = app.admin.get_group_by_name(id)["id"]
# log.error()
return (
json.dumps({"keycloak_id": None}),
200,
{"Content-Type": "application/json"},
)
if request.method == "DELETE":
group = app.admin.get_group_by_name(id)
if not group:
raise Error("not_found", "Group id not found")
app.admin.delete_group_by_id(group["id"])
return json.dumps({}), 200, {"Content-Type": "application/json"}
@app.route("/ddapi/user_mail", methods=["POST"])
@app.route("/ddapi/user_mail/<id>", methods=["GET", "DELETE"])
@has_token
def ddapi_user_mail(id=None):
if request.method == "GET":
return (
json.dumps("Not implemented yet"),
200,
{"Content-Type": "application/json"},
)
if request.method == "POST":
data = request.get_json(force=True)
# if not app.validators["mails"].validate(data):
# raise Error(
# "bad_request",
# "Data validation for mail failed: "
# + str(app.validators["mail"].errors),
# traceback.format_exc(),
# )
for user in data:
if not app.validators["mail"].validate(user):
raise Error(
"bad_request",
"Data validation for mail failed: "
+ str(app.validators["mail"].errors),
traceback.format_exc(),
)
for user in data:
log.info("Added user email")
app.admin.set_nextcloud_user_mail(user)
return (
json.dumps("Users emails updated"),
200,
{"Content-Type": "application/json"},
)
def user_parser(user):
return {
"keycloak_id": user["id"],
"id": user["username"],
"username": user["username"],
"enabled": user["enabled"],
"first": user["first"],
"last": user["last"],
"role": user["roles"][0] if len(user["roles"]) else None,
"email": user["email"],
"groups": user.get("groups", user["keycloak_groups"]),
"quota": user["quota"],
"quota_used_bytes": user["quota_used_bytes"],
}
def group_parser(group):
return {
"keycloak_id": group["id"],
"id": group["name"],
"name": group["name"].split(".")[-1],
"path": group["path"],
"description": group.get("description", ""),
}
def filter_users(users, text):
return [
user
for user in users
if text in user["username"]
or text in user["first"]
or text in user["last"]
or text in user["email"]
]

View File

@ -518,6 +518,7 @@ def dashboard_put(item):
{"Content-Type": "application/json"},
)
@app.route("/api/legal/<item>", methods=["GET"])
# @login_required
def legal_get(item):

View File

@ -137,3 +137,10 @@ def web_sysadmin_external():
return render_template(
"pages/sysadmin/external.html", title="External", nav="External"
)
@app.route("/sockettest")
def web_sockettest():
return render_template(
"pages/sockettest.html", title="Sockettest Users", nav="SysAdminUsers"
)

View File

@ -3,6 +3,7 @@
import json
import logging as log
import os
import socket
import sys
import time
import traceback
@ -17,6 +18,7 @@ from .decorators import is_internal
@app.route("/api/internal/users", methods=["GET"])
@is_internal
def internal_users():
log.error(socket.gethostbyname("isard-apps-wordpress"))
if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]

View File

@ -1,11 +1,17 @@
#!flask/bin/python
# coding=utf-8
import json
import logging as log
import os
import socket
from functools import wraps
from flask import redirect, request, url_for
from flask_login import current_user, logout_user
from jose import jwt
from ..auth.tokens import get_header_jwt_payload
def is_admin(fn):
@ -30,7 +36,53 @@ def is_internal(fn):
## but we should check if it is internal net and not haproxy
if socket.gethostbyname("isard-apps-wordpress") == remote_addr:
return fn(*args, **kwargs)
logout_user()
return redirect(url_for("login"))
return (
json.dumps(
{
"error": "unauthorized",
"msg": "Unauthorized access",
}
),
401,
{"Content-Type": "application/json"},
)
return decorated_view
def has_token(fn):
@wraps(fn)
def decorated(*args, **kwargs):
payload = get_header_jwt_payload()
return fn(*args, **kwargs)
return decorated
def is_internal_or_has_token(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
remote_addr = (
request.headers["X-Forwarded-For"].split(",")[0]
if "X-Forwarded-For" in request.headers
else request.remote_addr.split(",")[0]
)
payload = get_header_jwt_payload()
if socket.gethostbyname("isard-apps-wordpress") == remote_addr:
return fn(*args, **kwargs)
payload = get_header_jwt_payload()
return fn(*args, **kwargs)
return decorated_view
def login_or_token(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if current_user.is_authenticated:
return fn(*args, **kwargs)
payload = get_header_jwt_payload()
return fn(*args, **kwargs)
return decorated_view

View File

@ -6,7 +6,10 @@ monkey_patch()
import json
from flask_login import login_required
from admin.auth.tokens import get_token_payload
from admin.lib.api_exceptions import Error
from flask import request
from flask_login import current_user, login_required
from flask_socketio import (
SocketIO,
close_room,
@ -26,14 +29,36 @@ app.socketio = SocketIO(app)
@app.socketio.on("connect", namespace="/sio")
@login_required
def socketio_connect():
join_room("admin")
app.socketio.emit(
"update", json.dumps("Joined admins room"), namespace="/sio", room="admin"
)
if current_user.id:
join_room("admin")
app.socketio.emit(
"update", json.dumps("Joined admins room"), namespace="/sio", room="admin"
)
else:
None
@app.socketio.on("disconnect", namespace="/sio")
def socketio_disconnect():
None
leave_room("admin")
@app.socketio.on("connect", namespace="/sio/events")
def socketio_connect():
jwt = get_token_payload(request.args.get("jwt"))
join_room("events")
app.socketio.emit(
"update",
json.dumps("Joined events room"),
namespace="/sio/events",
room="events",
)
@app.socketio.on("disconnect", namespace="/sio/events")
def socketio_events_disconnect():
leave_room("events")
if __name__ == "__main__":
@ -42,8 +67,8 @@ if __name__ == "__main__":
host="0.0.0.0",
port=9000,
debug=False,
# ssl_context="adhoc",
# async_mode="threading",
) # , logger=logger, engineio_logger=engineio_logger)
)
# ssl_context="adhoc",
# async_mode="threading",
# ) # , logger=logger, engineio_logger=engineio_logger)
# , cors_allowed_origins="*"
# /usr/lib/python3.8/site-packages/certifi

423
admin/src/tests/api.py Normal file
View File

@ -0,0 +1,423 @@
import json
import os
import secrets
import time
import traceback
from datetime import datetime, timedelta
from pprint import pprint
import requests
from jose import jwt
## SETUP
domain = "admin.[YOURDOMAIN]"
secret = "[your API_SECRET]"
## END SETUP
auths = {}
dbconn = None
base = "https://" + domain + "/ddapi"
raw_jwt_data = {
"exp": datetime.utcnow() + timedelta(minutes=5),
"kid": "test",
}
admin_jwt = jwt.encode(raw_jwt_data, secret, algorithm="HS256")
jwt = {"Authorization": "Bearer " + admin_jwt}
#######################################################################################################################
print(" ----- USUARIS AL SISTEMA")
response = requests.get(
base + "/users",
headers=jwt,
verify=True,
)
print("METHOD: GET, URL: " + base + "/users, STATUS_CODE:" + str(response.status_code))
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- USUARIS AL SISTEMA QUE CONTENEN UN TEXT")
data = {"text": "alu"}
response = requests.post(
base + "/users/filter",
json=data,
headers=jwt,
verify=True,
)
print(
"METHOD: POST, URL: "
+ base
+ "/users/filter, STATUS_CODE:"
+ str(response.status_code)
+ ", POST DATA:"
)
pprint(data)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- GRUPS AL SISTEMA")
response = requests.get(
base + "/groups",
headers=jwt,
verify=True,
)
print("METHOD: GET, URL: " + base + "/groups, STATUS_CODE:" + str(response.status_code))
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- USUARIS DEL GRUP")
data = {"id": "test00.classeB"}
response = requests.post(
base + "/group/users",
json=data,
headers=jwt,
verify=True,
)
print(
"METHOD: POST, URL: "
+ base
+ "/group/users, STATUS_CODE:"
+ str(response.status_code)
+ ", POST DATA:"
)
pprint(data)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- ROLS AL SISTEMA")
response = requests.get(
base + "/roles",
headers=jwt,
verify=True,
)
print("METHOD: GET, URL: " + base + "/roles, STATUS_CODE:" + str(response.status_code))
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- USUARIS DEL ROL")
data = {"id": "teacher"}
response = requests.post(
base + "/role/users",
json=data,
headers=jwt,
verify=True,
)
print(
"METHOD: POST, URL: "
+ base
+ "/role/users, STATUS_CODE:"
+ str(response.status_code)
+ ", POST DATA:"
)
pprint(data)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text)[:2])
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
print("\nXXXXXXXXXXXXXXXXX ACTIONS ON USER XXXXXXXXXXXXXXXXXXXXXX\n")
#######################################################################################################################
print(" ----- GET USER")
response = requests.get(
base + "/user/nou.usuari",
headers=jwt,
verify=True,
)
print(
"METHOD: GET, URL: "
+ base
+ "/user/nou.usuari, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- DELETE USER")
response = requests.delete(
base + "/user/nou.usuari",
headers=jwt,
verify=True,
)
print(
"METHOD: DELETE, URL: "
+ base
+ "/user/nou.usuari, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- POST NEW USER")
user = {
"username": "nou.usuari",
"first": "Nou",
"last": "Usuari",
"email": "nou.usuari@nodns.com",
"password": "1n2n3n4n5n6",
"quota": "default",
"enabled": True,
"role": "student",
"groups": ["test00.classeB"],
}
response = requests.post(
base + "/user",
json=user,
headers=jwt,
verify=True,
)
print(
"METHOD: POST, URL: "
+ base
+ "/user, STATUS_CODE:"
+ str(response.status_code)
+ ", POST DATA:"
)
pprint(user)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- UPDATE USER")
update_user = {
"id": "nou.usuari",
"email": "nou.usuari@nodns.com",
"enabled": True,
"first": "Antic",
"groups": ["test00.classeB"],
"last": "Usuari",
"quota": "default",
"quota_used_bytes": "0 MB",
"role": "teacher",
}
response = requests.put(
base + "/user/nou.usuari",
json=update_user,
headers=jwt,
verify=True,
)
print(
"METHOD: PUT, URL: "
+ base
+ "/user/nou.usuari, STATUS_CODE:"
+ str(response.status_code)
+ ", PUT DATA:"
)
pprint(update_user)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- GET USER")
response = requests.get(
base + "/user/nou.usuari",
headers=jwt,
verify=True,
)
print(
"METHOD: GET, URL: "
+ base
+ "/user/nou.usuari, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- DELETE USER")
response = requests.delete(
base + "/user/nou.usuari",
headers=jwt,
verify=True,
)
print(
"METHOD: DELETE, URL: "
+ base
+ "/user/nou.usuari, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
print("\nXXXXXXXXXXXXXXXXX ACTIONS ON GROUP XXXXXXXXXXXXXXXXXXXXXX\n")
#######################################################################################################################
print(" ----- GET GROUP")
response = requests.get(
base + "/group/teacher",
headers=jwt,
verify=True,
)
print(
"METHOD: GET, URL: "
+ base
+ "/group/teacher, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- POST NEW GROUP")
group = {"name": "test"}
response = requests.post(
base + "/group",
json=group,
headers=jwt,
verify=True,
)
print(
"METHOD: POST, URL: "
+ base
+ "/group, STATUS_CODE:"
+ str(response.status_code)
+ ", POST DATA:"
)
pprint(group)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)
#######################################################################################################################
print(" ----- DELETE GROUP")
response = requests.delete(
base + "/group/test",
headers=jwt,
verify=True,
)
print(
"METHOD: DELETE, URL: "
+ base
+ "/group/test, STATUS_CODE:"
+ str(response.status_code)
)
if response.status_code == 200:
print("RESPONSE:")
pprint(json.loads(response.text))
else:
print(
"ERROR: "
+ json.loads(response.text)["error"]
+ " DESCRIPTION: "
+ json.loads(response.text)["description"]
)