2023-02-27 19:38:41 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
import io
|
|
|
|
import pathlib
|
|
|
|
import random
|
|
|
|
import traceback
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
from selenium import webdriver
|
|
|
|
|
|
|
|
# We need selenium 4
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from selenium.webdriver.chrome.options import Options as ChromeOptions
|
|
|
|
|
|
|
|
from typing import Any, Dict, Iterable, List
|
|
|
|
|
|
|
|
|
2023-03-31 10:16:22 +02:00
|
|
|
STEP_TIMEOUT_SECONDS = 40
|
|
|
|
|
|
|
|
|
2023-02-27 19:38:41 +01:00
|
|
|
class DDSession:
|
|
|
|
driver: webdriver.Chrome
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
instance: str,
|
|
|
|
users_file: str,
|
|
|
|
out_dir: pathlib.Path,
|
2023-03-31 10:16:22 +02:00
|
|
|
stepTimeoutSeconds: int = STEP_TIMEOUT_SECONDS,
|
2023-02-27 19:38:41 +01:00
|
|
|
):
|
|
|
|
self.instance = instance
|
|
|
|
self.users_file = [
|
|
|
|
[i.strip() for i in l.split(sep=",", maxsplit=1)]
|
|
|
|
for l in users_file.split("\n")
|
|
|
|
if l
|
|
|
|
]
|
|
|
|
self.out_dir = out_dir
|
|
|
|
self.stepTimeoutSeconds = stepTimeoutSeconds
|
|
|
|
|
|
|
|
if not self.out_dir.is_dir():
|
|
|
|
self.out_dir.mkdir()
|
|
|
|
|
|
|
|
self.start_time = datetime.utcnow()
|
|
|
|
self.last_time = self.start_time
|
|
|
|
self.screenshot_counter = 0
|
|
|
|
self.data: Dict[str, Any] = dict(
|
|
|
|
instance=self.instance, start_time=self.start_time, steps=[]
|
|
|
|
)
|
|
|
|
self.reset()
|
|
|
|
|
|
|
|
def persist(self) -> None:
|
|
|
|
yaml.dump(self.data, (self.out_dir / "session.yml").open("w"))
|
|
|
|
(self.out_dir / "session.html").open("w").write(self.summarise()["html"])
|
|
|
|
|
|
|
|
def summarise(self) -> Dict[str, Any]:
|
|
|
|
return DDSession.summarise_data(self.data)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def summarise_data(data) -> Dict[str, Any]:
|
|
|
|
import copy
|
|
|
|
from operator import getitem
|
|
|
|
from itertools import groupby
|
|
|
|
|
|
|
|
def summarise_item(g: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
|
|
if "substeps" in g[0]:
|
|
|
|
items = [sum((st["step_t"] for st in s["substeps"])) for s in g]
|
|
|
|
else:
|
|
|
|
items = [st["step_t"] for st in g]
|
|
|
|
return {
|
|
|
|
"max": max(items),
|
|
|
|
"count": len(items),
|
|
|
|
"average": sum(items) / len(items),
|
|
|
|
}
|
|
|
|
|
|
|
|
def summarise(d: Iterable[Dict[str, Any]]) -> Dict[str, Any]:
|
|
|
|
dict_data = dict()
|
|
|
|
for k, g in groupby(
|
|
|
|
sorted(d, key=lambda x: getitem(x, "step")),
|
|
|
|
key=lambda x: getitem(x, "step"),
|
|
|
|
):
|
|
|
|
dict_data[k] = summarise_item(list(g))
|
|
|
|
return dict_data
|
|
|
|
|
|
|
|
summary: Dict[str, Any] = dict()
|
|
|
|
summary["overview"] = summarise(data["steps"])
|
|
|
|
|
|
|
|
d = copy.deepcopy(data["steps"])
|
|
|
|
substeps = [
|
|
|
|
dict(st, step=f"{step['step']}: {st['substep']}")
|
|
|
|
for step in data["steps"]
|
|
|
|
for st in step["substeps"]
|
|
|
|
]
|
|
|
|
summary["stepbystep"] = summarise(substeps)
|
|
|
|
|
|
|
|
html = """<html>
|
|
|
|
<head><style>img { margin-bottom: 60px; }</style></head>
|
|
|
|
<body>
|
|
|
|
"""
|
|
|
|
html += f"""
|
|
|
|
<h1>{data['instance']}</h1>
|
|
|
|
<h2>{data['start_time']}</h2>
|
|
|
|
<details><summary>Summary</summary>
|
|
|
|
<pre>{yaml.dump(summary)}
|
|
|
|
</pre>
|
|
|
|
</details>
|
|
|
|
"""
|
|
|
|
for s in data["steps"]:
|
|
|
|
html += f"""<h3>{s['step']}: {s['t']} s (+ {sum((st['step_t'] for st in s['substeps']))} s)</h3>"""
|
|
|
|
for st in s["substeps"]:
|
|
|
|
html += f"""<h4>{s['step']} - {st['substep']}: {st['t']} s (+ {st['step_t']} s)</h4>"""
|
|
|
|
html += f"""<img src="{st['png']}"/>"""
|
|
|
|
html += "</body></html>"
|
|
|
|
summary["html"] = html
|
|
|
|
return summary
|
|
|
|
|
|
|
|
@property
|
|
|
|
def executed_seconds(self) -> float:
|
|
|
|
return (datetime.utcnow() - self.start_time).total_seconds()
|
|
|
|
|
|
|
|
@property
|
|
|
|
def nextcloud_url(self) -> str:
|
|
|
|
return f"https://nextcloud.{self.instance}"
|
|
|
|
|
|
|
|
def reset(self, new_driver: bool = False) -> None:
|
|
|
|
# If needed:
|
|
|
|
# https://stackoverflow.com/a/72922584
|
|
|
|
if getattr(self, "driver", None) is None or new_driver:
|
|
|
|
if getattr(self, "driver", None) is not None:
|
|
|
|
self.driver.quit()
|
|
|
|
options = ChromeOptions()
|
|
|
|
for arg in [
|
|
|
|
"--headless",
|
|
|
|
"--no-sandbox",
|
|
|
|
"--disable-infobars",
|
|
|
|
"--disable-extensions",
|
|
|
|
"--disable-dev-shm-usage",
|
|
|
|
"window-size=1400,600",
|
|
|
|
]:
|
|
|
|
options.add_argument(arg)
|
|
|
|
self.driver = webdriver.Chrome(options=options)
|
|
|
|
msg = "Restart"
|
|
|
|
else:
|
|
|
|
self.driver.delete_all_cookies()
|
|
|
|
msg = "Cleaned cache"
|
|
|
|
self.screenshot("Browser", msg)
|
|
|
|
|
|
|
|
def perform_login(self) -> None:
|
|
|
|
# Wait until Keycloak is shown
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: d.find_elements(By.ID, "kc-form-login")
|
|
|
|
)
|
|
|
|
|
|
|
|
self.screenshot("Login", "Request")
|
|
|
|
|
|
|
|
# Fill in form data for login
|
|
|
|
(username, password) = random.choice(self.users_file)
|
|
|
|
usernameField = self.driver.find_element(By.ID, "username")
|
|
|
|
usernameField.send_keys(username)
|
|
|
|
passwordField = self.driver.find_element(By.ID, "password")
|
|
|
|
passwordField.send_keys(password)
|
|
|
|
loginButton = self.driver.find_element(By.ID, "kc-login")
|
|
|
|
self.screenshot("Login", "Form filled")
|
|
|
|
|
|
|
|
loginButton.click()
|
|
|
|
|
|
|
|
self.screenshot("Login", "Form sent")
|
|
|
|
# TODO: wait for redirection middle step and session.screenshot it
|
|
|
|
|
|
|
|
def load_nextcloud_files(self) -> None:
|
|
|
|
# Start loading
|
|
|
|
self.driver.get(self.nextcloud_url)
|
|
|
|
|
|
|
|
# Wait until Nextcloud with Megamenu is shown or a login is requested
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: (
|
|
|
|
d.find_elements(By.ID, "kc-form-login")
|
|
|
|
+ d.find_elements(By.ID, "menu-apps-icon")
|
2023-03-31 10:16:22 +02:00
|
|
|
+ d.find_elements(By.ID, "menu-apps-btn")
|
2023-02-27 19:38:41 +01:00
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
# Detect whether or not a login is needed
|
|
|
|
login_required = self.driver.find_elements(By.ID, "kc-form-login")
|
|
|
|
if login_required:
|
|
|
|
# Perform login
|
|
|
|
self.perform_login()
|
|
|
|
|
|
|
|
self.screenshot("Nextcloud", "Loading")
|
|
|
|
|
|
|
|
# Wait until Nextcloud files app has loaded
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: d.find_elements(
|
|
|
|
By.CSS_SELECTOR, "#app-content-files td.selection"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.screenshot("Nextcloud", "Loaded")
|
|
|
|
|
|
|
|
def open_file_in_onlyoffice(self, file_base_name: str) -> None:
|
|
|
|
# Get and click docx file
|
|
|
|
# This assumes we are on the nextcloud files app with a FILE_BASE file
|
|
|
|
window_count = len(self.driver.window_handles)
|
|
|
|
docxFileLabel = self.driver.find_element(By.PARTIAL_LINK_TEXT, file_base_name)
|
|
|
|
docxFileLabel.click()
|
|
|
|
|
|
|
|
# Switch to OnlyOffice window
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
EC.number_of_windows_to_be(window_count + 1)
|
|
|
|
)
|
|
|
|
child = self.driver.window_handles[window_count]
|
|
|
|
self.driver.switch_to.window(child)
|
|
|
|
|
|
|
|
self.screenshot("OnlyOffice", "Opening")
|
|
|
|
|
|
|
|
# Wait for OnlyOffice to start loading
|
2023-03-31 10:16:22 +02:00
|
|
|
oofCSS = ".app-onlyoffice #app iframe"
|
2023-02-27 19:38:41 +01:00
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: d.find_elements(By.CSS_SELECTOR, oofCSS)
|
|
|
|
)
|
|
|
|
self.screenshot("OnlyOffice", "Loading 1")
|
|
|
|
|
|
|
|
# Switch to its iframe
|
|
|
|
oof = WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: EC.frame_to_be_available_and_switch_to_it(
|
|
|
|
d.find_element(By.CSS_SELECTOR, oofCSS)
|
|
|
|
)(d)
|
|
|
|
)
|
|
|
|
self.screenshot("OnlyOffice", "Loading 2")
|
|
|
|
|
|
|
|
oofLoaded = lambda d: EC.element_to_be_clickable(
|
|
|
|
d.find_element(By.ID, "id-toolbar-btn-save")
|
|
|
|
)(d)
|
|
|
|
|
|
|
|
# Get the first loading screen
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: oofLoaded(d)
|
|
|
|
or d.find_elements(By.CSS_SELECTOR, "#loading-mask div.loader-page")
|
|
|
|
)
|
|
|
|
self.screenshot("OnlyOffice", "Loading 3")
|
|
|
|
|
|
|
|
# Wait for OnlyOffice second loading phase
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
|
|
|
|
lambda d: oofLoaded(d)
|
|
|
|
or d.find_elements(By.CSS_SELECTOR, "div.asc-loadmask-title")
|
|
|
|
)
|
|
|
|
self.screenshot("OnlyOffice", "Loading 4")
|
|
|
|
|
|
|
|
# Wait a for final loading (save button clickable)
|
|
|
|
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds * 3).until(oofLoaded)
|
|
|
|
self.screenshot("OnlyOffice", "Loaded")
|
|
|
|
|
|
|
|
# Close OnlyOffice window
|
|
|
|
self.driver.close()
|
|
|
|
# And change back to the main window
|
|
|
|
self.driver.switch_to.window(self.driver.window_handles[window_count - 1])
|
|
|
|
self.screenshot("OnlyOffice", "Closed")
|
|
|
|
|
|
|
|
def screenshot(self, step: str, substep_txt: str) -> None:
|
|
|
|
# Get data
|
|
|
|
scr_now = datetime.utcnow()
|
|
|
|
scr_s = (scr_now - self.start_time).total_seconds()
|
|
|
|
step_t = (scr_now - self.last_time).total_seconds()
|
|
|
|
scr_id = self.screenshot_counter
|
|
|
|
|
|
|
|
# Upgrade values
|
|
|
|
self.last_time = scr_now
|
|
|
|
self.screenshot_counter += 1
|
|
|
|
|
|
|
|
# Constants
|
|
|
|
scr_fn = f"{scr_id:04d}.png"
|
|
|
|
scr_path = self.out_dir / scr_fn
|
|
|
|
# Get screenie
|
|
|
|
scr_img = self.driver.get_screenshot_as_png()
|
|
|
|
# Write it
|
|
|
|
open(scr_path, "wb").write(scr_img)
|
|
|
|
# Add to report
|
|
|
|
substep: Dict[str, Any] = dict(
|
|
|
|
id=scr_id, substep=substep_txt, step_t=step_t, t=scr_s, png=scr_fn
|
|
|
|
)
|
|
|
|
if self.data["steps"] and self.data["steps"][-1]["step"] == step:
|
|
|
|
self.data["steps"][-1]["substeps"].append(substep)
|
|
|
|
else:
|
|
|
|
self.data["steps"].append(dict(step=step, t=scr_s, substeps=[substep]))
|
|
|
|
|
|
|
|
|
|
|
|
def randomBool() -> bool:
|
|
|
|
return random.choice([True, False])
|
|
|
|
|
|
|
|
|
|
|
|
def main_test(
|
|
|
|
instance: str,
|
|
|
|
users_file: io.TextIOWrapper,
|
|
|
|
out_dir: pathlib.Path,
|
|
|
|
duration: int,
|
2023-03-31 10:16:22 +02:00
|
|
|
stepTimeoutSeconds: int = STEP_TIMEOUT_SECONDS,
|
2023-02-27 19:38:41 +01:00
|
|
|
) -> DDSession:
|
|
|
|
session = DDSession(
|
|
|
|
instance=instance,
|
|
|
|
users_file=users_file.read(),
|
|
|
|
out_dir=out_dir,
|
2023-03-31 10:16:22 +02:00
|
|
|
stepTimeoutSeconds=stepTimeoutSeconds,
|
2023-02-27 19:38:41 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
first_run = True
|
|
|
|
while session.executed_seconds < duration:
|
|
|
|
# 50% chance of requiring login reseting cookies
|
|
|
|
# First run always requires login, skip reset
|
|
|
|
if not first_run and randomBool():
|
|
|
|
# Possibly driver too
|
|
|
|
# 25% chance of cleaning all cache and reopening the browser
|
|
|
|
session.reset(new_driver=randomBool())
|
|
|
|
first_run = False
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Load nextcloud files
|
|
|
|
session.load_nextcloud_files()
|
|
|
|
# Open file in OnlyOffice
|
|
|
|
session.open_file_in_onlyoffice("template_1")
|
|
|
|
except Exception as ex:
|
|
|
|
session.screenshot("Exception", str(ex))
|
|
|
|
print(traceback.format_exc())
|
|
|
|
session.persist()
|
|
|
|
return session
|
|
|
|
|
|
|
|
|
|
|
|
def main_summary(filename: io.TextIOWrapper) -> None:
|
|
|
|
data = yaml.safe_load(filename)
|
|
|
|
summary = DDSession.summarise_data(data)
|
|
|
|
html_fn = pathlib.Path(filename.name).with_suffix(".html")
|
|
|
|
open(html_fn, "w").write(summary["html"])
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
import pathlib
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
prog="DD Selenium tester", description="Run basic UI tests on DD"
|
|
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(required=True)
|
|
|
|
test_parser = subparsers.add_parser("test")
|
|
|
|
|
|
|
|
test_parser.add_argument("instance")
|
|
|
|
test_parser.add_argument(
|
|
|
|
"-u",
|
|
|
|
"--users-file",
|
|
|
|
default="dd-stress-test.users.csv",
|
|
|
|
type=argparse.FileType("r"),
|
|
|
|
)
|
|
|
|
test_parser.add_argument("-d", "--duration", default=300, type=int)
|
|
|
|
test_parser.add_argument("-o", "--out-dir", default="results", type=pathlib.Path)
|
|
|
|
|
|
|
|
summary_parser = subparsers.add_parser("summary")
|
|
|
|
summary_parser.add_argument(
|
|
|
|
"filename", default="session.yml", type=argparse.FileType("r")
|
|
|
|
)
|
|
|
|
|
|
|
|
ns = parser.parse_args()
|
|
|
|
if "instance" in ns:
|
|
|
|
main_test(**vars(ns))
|
|
|
|
else:
|
|
|
|
main_summary(**vars(ns))
|