digitaldemocratic/dd-sso/admin/src/admin/lib/nextcloud.py

601 lines
25 KiB
Python

#
# Copyright © 2021,2022 IsardVDI S.L.
# Copyright © 2022 Evilham <contact@evilham.com>
#
# This file is part of DD
#
# DD is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# DD is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with DD. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import logging as log
import os
import pprint
import time
import traceback
import urllib
import requests
from psycopg2 import sql
from .nextcloud_exc import *
from .postgres import Postgres
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
if TYPE_CHECKING:
from admin.flaskapp import AdminFlaskApp
DDUser = Dict[Any, Any]
class Nextcloud:
verify_cert : bool
apiurl : str
shareurl : str
davurl : str
auth : Tuple[str, str]
user : str
nextcloud_pg : Postgres
def __init__(
self,
app : "AdminFlaskApp",
username : str=os.environ.get("NEXTCLOUD_ADMIN_USER", ""),
password : str=os.environ.get("NEXTCLOUD_ADMIN_PASSWORD", ""),
verify : bool=True,
) -> None:
url = "https://nextcloud." + app.config["DOMAIN"]
self.verify_cert = verify
self.apiurl = url + "/ocs/v1.php/cloud/"
self.shareurl = url + "/ocs/v2.php/apps/files_sharing/api/v1/"
self.davurl = url + "/remote.php/dav/files/"
self.auth = (username, password)
self.user = username
self.nextcloud_pg = Postgres(
"dd-apps-postgresql",
"nextcloud",
app.config["NEXTCLOUD_POSTGRES_USER"],
app.config["NEXTCLOUD_POSTGRES_PASSWORD"],
)
def _request(
self, method : str, url : str, data : Any={}, headers : Dict[str, str]={"OCS-APIRequest": "true"}, auth : Optional[Tuple[str, str]]=None
) -> str:
if auth is None:
auth = self.auth
try:
response = requests.request(
method,
url,
data=data,
auth=auth,
verify=self.verify_cert,
headers=headers,
)
if "meta" in response.text:
if "<statuscode>997</statuscode>" in response.text:
raise ProviderUnauthorized
# if '<statuscode>998</statuscode>' in response.text: raise ProviderInvalidQuery
return response.text
## At least the ProviderSslError is not being catched or not raised correctly
except requests.exceptions.HTTPError as errh:
raise ProviderConnError
except requests.exceptions.Timeout as errt:
raise ProviderConnTimeout
except requests.exceptions.SSLError as err:
raise ProviderSslError
except requests.exceptions.ConnectionError as errc:
raise ProviderConnError
# except requests.exceptions.RequestException as err:
# raise ProviderError
except Exception as e:
if str(e) == "an integer is required (got type bytes)":
raise ProviderConnError
raise ProviderError
def check_connection(self) -> bool:
url = self.apiurl + "users/" + self.user + "?format=json"
try:
result = self._request("GET", url)
if json.loads(result)["ocs"]["meta"]["statuscode"] == 100:
return True
raise ProviderError
except requests.exceptions.HTTPError as errh:
raise ProviderConnError
except requests.exceptions.ConnectionError as errc:
raise ProviderConnError
except requests.exceptions.Timeout as errt:
raise ProviderConnTimeout
except requests.exceptions.SSLError as err:
raise ProviderSslError
except requests.exceptions.RequestException as err:
raise ProviderError
except Exception as e:
if str(e) == "an integer is required (got type bytes)":
raise ProviderConnError
raise ProviderError
def get_user(self, userid : str) -> Any:
url = self.apiurl + "users/" + userid + "?format=json"
try:
result = json.loads(self._request("GET", url))
if result["ocs"]["meta"]["statuscode"] == 100:
return result["ocs"]["data"]
raise ProviderItemNotExists
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# q = """select u.uid as username, adn.value as displayname, ade.value as email, json_agg(gg.displayname) as admin_groups,json_agg(g.displayname) as groups
# from oc_users as u
# left join oc_group_user as gu on gu.uid = u.uid
# left join oc_groups as g on gu.gid = g.gid
# left join oc_group_admin as ga on ga.uid = u.uid
# left join oc_groups as gg on gg.gid = ga.gid
# left join oc_accounts_data as adn on adn.uid = u.uid and adn.name = 'displayname'
# left join oc_accounts_data as ade on ade.uid = u.uid and ade.name = 'email'
# group by u.uid, adn.value, ade.value"""
# cur.execute(q)
# users = cur.fetchall()
# fields = [a.name for a in cur.description]
# cur.close()
# conn.close()
# users_with_lists = [list(l[:-2])+([[]] if l[-2] == [None] else [list(set(l[-2]))]) + ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users]
# users_with_lists = [list(l[:-2])+([[]] if l[-2] == [None] else [list(set(l[-2]))]) + ([[]] if l[-1] == [None] else [list(set(l[-1]))]) for l in users_with_lists]
# list_dict_users = [dict(zip(fields, r)) for r in users_with_lists]
def get_users_list(self) -> List[DDUser]:
# q = """select u.uid as username, adn.value as displayname, ade.value as email, json_agg(gg.displayname) as admin_groups,json_agg(g.displayname) as groups
# from oc_users as u
# left join oc_group_user as gu on gu.uid = u.uid
# left join oc_groups as g on gu.gid = g.gid
# left join oc_group_admin as ga on ga.uid = u.uid
# left join oc_groups as gg on gg.gid = ga.gid
# left join oc_accounts_data as adn on adn.uid = u.uid and adn.name = 'displayname'
# left join oc_accounts_data as ade on ade.uid = u.uid and ade.name = 'email'
# group by u.uid, adn.value, ade.value"""
# With quotas
q = """select u.uid as username, configvalue as quota, sum(size) as total_bytes, adn.value as displayname, ade.value as email, json_agg(gg.displayname) as admin_groups,json_agg(g.displayname) as groups
from oc_accounts as u
left join oc_group_user as gu on gu.uid = u.uid
left join oc_groups as g on gu.gid = g.gid
left join oc_group_admin as ga on ga.uid = u.uid
left join oc_groups as gg on gg.gid = ga.gid
left join oc_accounts_data as adn on adn.uid = u.uid and adn.name = 'displayname'
left join oc_accounts_data as ade on ade.uid = u.uid and ade.name = 'email'
left join oc_preferences as pref on u.uid=pref.userid and appid='files' and configkey='quota'
left join oc_storages as s on s.id=CONCAT('home::',u.uid)
left join oc_filecache as fc on fc.storage = numeric_id
group by u.uid, adn.value, ade.value, pref.configvalue"""
(headers, users) = self.nextcloud_pg.select_with_headers(q)
users_with_lists = [
list(l[:-2])
+ ([[]] if l[-2] == [None] else [list(set(l[-2]))])
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
for l in users
]
users_with_lists = [
list(l[:-2])
+ ([[]] if l[-2] == [None] else [list(set(l[-2]))])
+ ([[]] if l[-1] == [None] else [list(set(l[-1]))])
for l in users_with_lists
]
list_dict_users = [dict(zip(headers, r)) for r in users_with_lists]
return list_dict_users
### Too slow...
# def get_users_list(self):
# url = self.apiurl + "users?format=json"
# try:
# result = json.loads(self._request('GET',url))
# if result['ocs']['meta']['statuscode'] == 100: return result['ocs']['data']['users']
# log.error('Get Nextcloud provider users list error: '+str(result))
# raise ProviderOpError
# except:
# log.error(traceback.format_exc())
# raise
# TODO: Improve typing of these functions...
def add_user(
self, userid : str, userpassword : str, quota : Any=False, group : Any=False, email : str="", displayname : str=""
) -> bool:
data = {
"userid": userid,
"password": userpassword,
"quota": quota,
"groups[]": group,
"email": email,
"displayname": displayname,
}
if not group:
del data["groups[]"]
if not quota:
del data["quota"]
# if group:
# data={'userid':userid,'password':userpassword,'quota':quota,'groups[]':group,'email':email,'displayname':displayname}
# else:
# data={'userid':userid,'password':userpassword,'quota':quota,'email':email,'displayname':displayname}
url = self.apiurl + "users?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(self._request("POST", url, data=data, headers=headers))
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
raise ProviderItemExists
if result["ocs"]["meta"]["statuscode"] == 104:
self.add_group(group)
# raise ProviderGroupNotExists
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# 101 - invalid input data
# 102 - username already exists
# 103 - unknown error occurred whilst adding the user
# 104 - group does not exist
# 105 - insufficient privileges for group
# 106 - no group specified (required for subadmins)
# 107 - all errors that contain a hint - for example “Password is among the 1,000,000 most common ones. Please make it unique.” (this code was added in 12.0.6 & 13.0.1)
def update_user(self, userid : str, key_values : Dict[str, Any]) -> bool:
# key_values={'quota':quota,'email':email,'displayname':displayname}
url = f"{self.apiurl}users/{userid}"
headers = {
"Accept": "application/json",
"OCS-APIRequest": "true",
}
for k, v in key_values.items():
data = {"key": k, "value": v}
try:
result = json.loads(
self._request("PUT", url, data=data, headers=headers)
)
if result["ocs"]["meta"]["statuscode"] == 102:
if result["ocs"]["meta"].get("message") != "Invalid displayname":
# Workaround for https://github.com/nextcloud/server/issues/33751
# Fixed on NC 25
raise ProviderItemExists
elif result["ocs"]["meta"]["statuscode"] == 104:
raise ProviderGroupNotExists
elif result["ocs"]["meta"]["statuscode"] != 100:
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
except:
raise
return True
def add_user_to_group(self, userid : str, group_id : str) -> bool:
data = {"groupid": group_id}
url = self.apiurl + "users/" + userid + "/groups?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(self._request("POST", url, data=data, headers=headers))
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
raise ProviderItemExists
if result["ocs"]["meta"]["statuscode"] == 104:
raise ProviderGroupNotExists
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
def remove_user_from_group(self, userid : str, group_id : str) -> bool:
data = {"groupid": group_id}
url = self.apiurl + "users/" + userid + "/groups?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(
self._request("DELETE", url, data=data, headers=headers)
)
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
raise ProviderItemExists
# TODO: It is unclear what status code 104 is, it certainly
# shouldn't the group if it doesn't exist
#if result["ocs"]["meta"]["statuscode"] == 104:
# self.add_group(group)
# # raise ProviderGroupNotExists
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# TODO: Improve typing of these functions...
def add_user_with_groups(
self, userid : str, userpassword : str, quota : Any=False, groups : Any=[], email : str="", displayname : str=""
) -> bool:
data = {
"userid": userid,
"password": userpassword,
"quota": quota,
"groups[]": groups,
"email": email,
"displayname": displayname,
}
# if not group: del data['groups[]']
if not quota:
del data["quota"]
# if group:
# data={'userid':userid,'password':userpassword,'quota':quota,'groups[]':group,'email':email,'displayname':displayname}
# else:
# data={'userid':userid,'password':userpassword,'quota':quota,'email':email,'displayname':displayname}
url = self.apiurl + "users?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(self._request("POST", url, data=data, headers=headers))
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
raise ProviderItemExists
if result["ocs"]["meta"]["statuscode"] == 104:
# self.add_group(group)
pass
# raise ProviderGroupNotExists
log.error("Get Nextcloud provider user add error: " + str(result))
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# 101 - invalid input data
# 102 - username already exists
# 103 - unknown error occurred whilst adding the user
# 104 - group does not exist
# 105 - insufficient privileges for group
# 106 - no group specified (required for subadmins)
# 107 - all errors that contain a hint - for example “Password is among the 1,000,000 most common ones. Please make it unique.” (this code was added in 12.0.6 & 13.0.1)
def delete_user(self, userid : str) -> bool:
url = self.apiurl + "users/" + userid + "?format=json"
try:
result = json.loads(self._request("DELETE", url))
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 101:
raise ProviderUserNotExists
# log.error(traceback.format_exc())
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# 101 - failure
def enable_user(self, userid : str) -> None:
pass
def disable_user(self, userid : str) -> None:
pass
def exists_user_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> bool:
auth = (userid, userpassword)
url = self.davurl + userid + "/" + folder + "?format=json"
headers = {
"Depth": "0",
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = self._request("PROPFIND", url, auth=auth, headers=headers)
if "<d:status>HTTP/1.1 200 OK</d:status>" in result:
return True
return False
except:
# log.error(traceback.format_exc())
raise
def add_user_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> bool:
auth = (userid, userpassword)
url = self.davurl + userid + "/" + folder + "?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = self._request("MKCOL", url, auth=auth, headers=headers)
if result == "":
return True
if (
"<s:message>The resource you tried to create already exists</s:message>"
in result
):
raise ProviderItemExists
log.error(result.split("message>")[1].split("<")[0])
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
def exists_user_share_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> Dict[str, str]:
auth = (userid, userpassword)
url = self.shareurl + "shares?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(self._request("GET", url, auth=auth, headers=headers))
if result["ocs"]["meta"]["statuscode"] == 200:
share = [s for s in result["ocs"]["data"] if s["path"] == "/" + folder]
if len(share) >= 1:
# Should we delete all but the first (0) one?
return {"token": share[0]["token"], "url": share[0]["url"]}
raise ProviderItemNotExists
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
def add_user_share_folder(self, userid : str, userpassword : str, folder : str="IsardVDI") -> Dict[str, str]:
auth = (userid, userpassword)
data = {"path": "/" + folder, "shareType": 3}
url = self.shareurl + "shares?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(
self._request("POST", url, data=data, auth=auth, headers=headers)
)
if (
result["ocs"]["meta"]["statuscode"] == 100
or result["ocs"]["meta"]["statuscode"] == 200
):
return {
"token": result["ocs"]["data"]["token"],
"url": result["ocs"]["data"]["url"],
}
log.error(
"Add user share folder error: " + result["ocs"]["meta"]["message"]
)
raise ProviderFolderNotExists
except:
# log.error(traceback.format_exc())
raise
def get_group(self, userid : str) -> None:
pass
def get_groups_list(self) -> List[Any]:
url = self.apiurl + "groups?format=json"
try:
result = json.loads(self._request("GET", url))
if result["ocs"]["meta"]["statuscode"] == 100:
return [g for g in result["ocs"]["data"]["groups"]]
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
def add_group(self, groupid : str) -> bool:
data = {"groupid": groupid}
url = self.apiurl + "groups?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(
self._request("POST", url, data=data, auth=self.auth, headers=headers)
)
if result["ocs"]["meta"]["statuscode"] == 100:
return True
if result["ocs"]["meta"]["statuscode"] == 102:
raise ProviderItemExists
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# 101 - invalid input data
# 102 - group already exists
# 103 - failed to add the group
def delete_group(self, groupid : str) -> bool:
group = urllib.parse.quote(groupid, safe="")
url = self.apiurl + "groups/" + group + "?format=json"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"OCS-APIRequest": "true",
}
try:
result = json.loads(
self._request("DELETE", url, auth=self.auth, headers=headers)
)
if result["ocs"]["meta"]["statuscode"] == 100:
return True
# log.error(traceback.format_exc())
raise ProviderOpError
except:
# log.error(traceback.format_exc())
raise
# 100 - successful
# 101 - invalid input data
# 102 - group already exists
# 103 - failed to add the group
def set_user_mail(self, data : DDUser) -> None:
query = """SELECT * FROM "oc_mail_accounts" WHERE "email" = '%s'"""
sql_query = sql.SQL(query.format(data["email"]))
if not len(self.nextcloud_pg.select(sql_query)):
query = """INSERT INTO "oc_mail_accounts" ("user_id","name","email","inbound_host","inbound_port","inbound_ssl_mode","inbound_user","inbound_password","outbound_host","outbound_port","outbound_ssl_mode","outbound_user","outbound_password") VALUES
('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');"""
account = [
data["user_id"],
data["name"],
data["email"],
data["inbound_host"],
data["inbound_port"],
data["inbound_ssl_mode"],
data["inbound_user"],
data["inbound_password"],
data["outbound_host"],
data["outbound_port"],
data["outbound_ssl_mode"],
data["outbound_user"],
data["outbound_password"],
]
else:
query = """UPDATE "oc_mail_accounts" SET ("user_id","name","email","inbound_host","inbound_port","inbound_ssl_mode","inbound_user","inbound_password","outbound_host","outbound_port","outbound_ssl_mode","outbound_user","outbound_password") =
('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') WHERE email = '%s';"""
account = [
data["user_id"],
data["name"],
data["email"],
data["inbound_host"],
data["inbound_port"],
data["inbound_ssl_mode"],
data["inbound_user"],
data["inbound_password"],
data["outbound_host"],
data["outbound_port"],
data["outbound_ssl_mode"],
data["outbound_user"],
data["outbound_password"],
data["email"],
]
sql_query = sql.SQL(query.format(",".join([str(acc) for acc in account])))
self.nextcloud_pg.update(sql_query)