feat(admin): opened basic api with jwt auth

darta 2022-04-15 19:22:34 +02:00
parent b03cdd162c
commit c2fc854f16
4 changed files with 66 additions and 7 deletions

View File

@ -12,5 +12,6 @@ minio==7.0.3
urllib3==1.26.6 urllib3==1.26.6
schema==0.7.5 schema==0.7.5
Werkzeug~=2.0.0 Werkzeug~=2.0.0
python-jose==3.3.0
# Unused yet # Unused yet
#flask-oidc==1.4.0 #flask-oidc==1.4.0

View File

@ -34,6 +34,9 @@ class loadConfig:
app.config.setdefault( app.config.setdefault(
"VERIFY", True if os.environ["VERIFY"] == "true" else False "VERIFY", True if os.environ["VERIFY"] == "true" else False
) )
app.config.setdefault(
"API_SECRET", os.environ.get("API_SECRET")
)
except Exception as e: except Exception as e:
log.error(traceback.format_exc()) log.error(traceback.format_exc())
raise raise

View File

@ -11,12 +11,14 @@ from flask import request
from admin import app from admin import app
from .decorators import is_internal from .decorators import is_internal, is_internal_or_has_token
import socket
@app.route("/api/internal/users", methods=["GET"]) @app.route("/api/internal/users", methods=["GET"])
@is_internal @is_internal_or_has_token
def internal_users(): def internal_users():
log.error(socket.gethostbyname("isard-apps-wordpress"))
if request.method == "GET": if request.method == "GET":
sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"]) sorted_users = sorted(app.admin.get_mix_users(), key=lambda k: k["username"])
# group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']] # group_users = [user for user in sorted_users if data['path'] in user['keycloak_groups']]
@ -29,7 +31,7 @@ def internal_users():
@app.route("/api/internal/users/filter", methods=["POST"]) @app.route("/api/internal/users/filter", methods=["POST"])
@is_internal @is_internal_or_has_token
def internal_users_search(): def internal_users_search():
if request.method == "POST": if request.method == "POST":
data = request.get_json(force=True) data = request.get_json(force=True)
@ -40,7 +42,7 @@ def internal_users_search():
@app.route("/api/internal/groups", methods=["GET"]) @app.route("/api/internal/groups", methods=["GET"])
@is_internal @is_internal_or_has_token
def internal_groups(): def internal_groups():
if request.method == "GET": if request.method == "GET":
sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: k["name"]) sorted_groups = sorted(app.admin.get_mix_groups(), key=lambda k: k["name"])
@ -59,7 +61,7 @@ def internal_groups():
@app.route("/api/internal/group/users", methods=["POST"]) @app.route("/api/internal/group/users", methods=["POST"])
@is_internal @is_internal_or_has_token
def internal_group_users(): def internal_group_users():
if request.method == "POST": if request.method == "POST":
data = request.get_json(force=True) data = request.get_json(force=True)
@ -78,7 +80,7 @@ def internal_group_users():
@app.route("/api/internal/roles", methods=["GET"]) @app.route("/api/internal/roles", methods=["GET"])
@is_internal @is_internal_or_has_token
def internal_roles(): def internal_roles():
if request.method == "GET": if request.method == "GET":
roles = [] roles = []
@ -96,7 +98,7 @@ def internal_roles():
@app.route("/api/internal/role/users", methods=["POST"]) @app.route("/api/internal/role/users", methods=["POST"])
@is_internal @is_internal_or_has_token
def internal_role_users(): def internal_role_users():
if request.method == "POST": if request.method == "POST":
data = request.get_json(force=True) data = request.get_json(force=True)

View File

@ -3,6 +3,10 @@
import socket import socket
from functools import wraps from functools import wraps
import json
import os
from jose import jwt
from ..auth.tokens import get_header_jwt_payload
from flask import redirect, request, url_for from flask import redirect, request, url_for
from flask_login import current_user, logout_user from flask_login import current_user, logout_user
@ -34,3 +38,52 @@ def is_internal(fn):
return redirect(url_for("login")) return redirect(url_for("login"))
return decorated_view return decorated_view
def has_token(fn):
@wraps(fn)
def decorated(*args, **kwargs):
payload = get_header_jwt_payload()
# if payload.get("role_id") != "admin":
# maintenance()
kwargs["payload"] = payload
return fn(*args, **kwargs)
return decorated
def is_internal_or_has_token(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
remote_addr = (
request.headers["X-Forwarded-For"].split(",")[0]
if "X-Forwarded-For" in request.headers
else request.remote_addr.split(",")[0]
)
## Now only checks if it is wordpress container,
## but we should check if it is internal net and not haproxy
valid_jwt = False
try:
payload = get_header_jwt_payload()
valid_jwt = True
except:
valid_jwt = False
if valid_jwt:
return fn(*args, **kwargs)
else:
return (
json.dumps(
{
"error": "unauthorized",
"msg": "Unauthorized access",
}
),
401,
{"Content-Type": "application/json"},
)
if socket.gethostbyname("isard-apps-wordpress") == remote_addr:
return fn(*args, **kwargs)
else:
logout_user()
return redirect(url_for("login"))
return decorated_view